SolarFM/plugins/file_properties/plugin.py

250 lines
9.4 KiB
Python

# Python imports
import os
import threading
import subprocess
import time
import pwd
import grp
from datetime import datetime
# Lib imports
import gi
gi.require_version('Gtk', '3.0')
from gi.repository import Gtk
from gi.repository import GLib
from gi.repository import Gio
# Application imports
from plugins.plugin_base import PluginBase
# NOTE: Threads WILL NOT die with parent's destruction.
def threaded(fn):
def wrapper(*args, **kwargs):
threading.Thread(target=fn, args=args, kwargs=kwargs, daemon=False).start()
return wrapper
# NOTE: Threads WILL die with parent's destruction.
def daemon_threaded(fn):
def wrapper(*args, **kwargs):
threading.Thread(target=fn, args=args, kwargs=kwargs, daemon=True).start()
return wrapper
class Properties:
file_uri: str = None
file_name: str = None
file_location: str = None
file_target: str = None
mime_type: str = None
file_size: str = None
mtime: int = None
atime: int = None
file_owner: str = None
file_group: str = None
chmod_stat: str = None
class Plugin(PluginBase):
def __init__(self):
super().__init__()
self.path = os.path.dirname(os.path.realpath(__file__))
self._GLADE_FILE = f"{self.path}/file_properties.glade"
self.name = "Properties" # NOTE: Need to remove after establishing private bidirectional 1-1 message bus
# where self.name should not be needed for message comms
self._properties_dialog = None
self._file_name = None
self._file_location = None
self._file_target = None
self._mime_type = None
self._file_size = None
self._mtime = None
self._atime = None
self._file_owner = None
self._file_group = None
self._chmod_map: {} = {
"7": "rwx",
"6": "rw",
"5": "rx",
"4": "r",
"3": "wx",
"2": "w",
"1": "x",
"0": ""
}
self._chmod_map_counter: {} = {
"rwx": "7",
"rw": "6",
"rx": "5",
"r": "4",
"wx": "3",
"w": "2",
"x": "1",
"": "0"
}
def run(self):
self._builder = Gtk.Builder()
self._builder.add_from_file(self._GLADE_FILE)
self._properties_dialog = self._builder.get_object("file_properties_dialog")
self._file_name = self._builder.get_object("file_name")
self._file_location = self._builder.get_object("file_location")
self._file_target = self._builder.get_object("file_target")
self._mime_type = self._builder.get_object("mime_type")
self._file_size = self._builder.get_object("file_size")
self._mtime = self._builder.get_object("mtime")
self._atime = self._builder.get_object("atime")
self._file_owner = self._builder.get_object("file_owner")
self._file_group = self._builder.get_object("file_group")
def generate_reference_ui_element(self):
item = Gtk.ImageMenuItem(self.name)
item.set_image( Gtk.Image(stock=Gtk.STOCK_PROPERTIES) )
item.connect("activate", self._show_properties_page)
item.set_always_show_image(True)
return item
@threaded
def _show_properties_page(self, widget=None, eve=None):
event_system.emit("get_current_state")
state = self._fm_state
self._event_message = None
GLib.idle_add(self._process_changes, (state))
def _process_changes(self, state):
if len(state.uris) == 1:
uri = state.uris[0]
path = state.tab.get_current_directory()
properties = self._set_ui_data(uri, path)
response = self._properties_dialog.run()
if response in [Gtk.ResponseType.CANCEL, Gtk.ResponseType.DELETE_EVENT]:
self._properties_dialog.hide()
self._update_file(properties)
self._properties_dialog.hide()
def _update_file(self, properties):
chmod_stat = self._get_check_boxes()
if chmod_stat is not properties.chmod_stat:
try:
print("\nNew chmod flags...")
print(f"Old: {''.join(properties.chmod_stat)}")
print(f"New: {chmod_stat}")
command = ["chmod", f"{chmod_stat}", properties.file_uri]
with subprocess.Popen(command, stdout=subprocess.PIPE) as proc:
result = proc.stdout.read().decode("UTF-8").strip()
print(result)
except Exception as e:
print(f"Couldn't chmod\nFile: {properties.file_uri}")
print( repr(e) )
owner = self._file_owner.get_text()
group = self._file_group.get_text()
if owner is not properties.file_owner or group is not properties.file_group:
try:
print("\nNew owner/group flags...")
print(f"Old:\n\tOwner: {properties.file_owner}\n\tGroup: {properties.file_group}")
print(f"New:\n\tOwner: {owner}\n\tGroup: {group}")
uid = pwd.getpwnam(owner).pw_uid
gid = grp.getgrnam(group).gr_gid
os.chown(properties.file_uri, uid, gid)
except Exception as e:
print(f"Couldn't chmod\nFile: {properties.file_uri}")
print( repr(e) )
def _set_ui_data(self, uri, path):
properties = Properties()
file_info = Gio.File.new_for_path(uri).query_info(attributes="standard::*,owner::*,time::access,time::changed",
flags=Gio.FileQueryInfoFlags.NONE,
cancellable=None)
is_symlink = file_info.get_attribute_as_string("standard::is-symlink")
properties.file_uri = uri
properties.file_target = file_info.get_attribute_as_string("standard::symlink-target") if is_symlink else ""
properties.file_name = file_info.get_display_name()
properties.file_location = path
properties.mime_type = file_info.get_content_type()
properties.file_size = self._sizeof_fmt(file_info.get_size())
properties.mtime = datetime.fromtimestamp( int(file_info.get_attribute_as_string("time::changed")) ).strftime("%A, %B %d, %Y %I:%M:%S")
properties.atime = datetime.fromtimestamp( int(file_info.get_attribute_as_string("time::access")) ).strftime("%A, %B %d, %Y %I:%M:%S")
properties.file_owner = file_info.get_attribute_as_string("owner::user")
properties.file_group = file_info.get_attribute_as_string("owner::group")
# NOTE: Read = 4, Write = 2, Exec = 1
command = ["stat", "-c", "%a", uri]
with subprocess.Popen(command, stdout=subprocess.PIPE) as proc:
properties.chmod_stat = list(proc.stdout.read().decode("UTF-8").strip())
owner = self._chmod_map[f"{properties.chmod_stat[0]}"]
group = self._chmod_map[f"{properties.chmod_stat[1]}"]
others = self._chmod_map[f"{properties.chmod_stat[2]}"]
self._reset_check_boxes()
self._set_check_boxes([["owner", owner], ["group", group], ["others", others]])
self._file_name.set_text(properties.file_name)
self._file_location.set_text(properties.file_location)
self._file_target.set_text(properties.file_target)
self._mime_type.set_label(properties.mime_type)
self._file_size.set_label(properties.file_size)
self._mtime.set_text(properties.mtime)
self._atime.set_text(properties.atime)
self._file_owner.set_text(properties.file_owner)
self._file_group.set_text(properties.file_group)
return properties
def _get_check_boxes(self):
perms = [[], [], []]
for i, target in enumerate(["owner", "group", "others"]):
for type in ["r", "w", "x"]:
is_active = self._builder.get_object(f"{target}_{type}").get_active()
if is_active:
perms[i].append(type)
digits = []
for perm in perms:
digits.append(self._chmod_map_counter[ ''.join(perm) ])
return ''.join(digits)
def _set_check_boxes(self, targets):
for name, target in targets:
for type in list(target):
obj = f"{name}_{type}"
self._builder.get_object(obj).set_active(True)
def _reset_check_boxes(self):
for target in ["owner", "group", "others"]:
for type in ["r", "w", "x"]:
self._builder.get_object(f"{target}_{type}").set_active(False)
def _sizeof_fmt(self, num, suffix="B"):
for unit in ["", "K", "M", "G", "T", "Pi", "Ei", "Zi"]:
if abs(num) < 1024.0:
return f"{num:3.1f} {unit}{suffix}"
num /= 1024.0
return f"{num:.1f} Yi{suffix}"