initial push

This commit is contained in:
itdominator 2023-05-07 01:29:59 -05:00
parent 0d8c46b59f
commit 834d7ee94b
54 changed files with 2046 additions and 593 deletions

View File

@ -1,20 +1,12 @@
# Python-With-Gtk-Template
A template project for Python with Gtk applications.
# Pulstar
Pulstar is a Python + Gtk app to control the volume levels.
### Requirements
# Requirements
* PyGObject
* setproctitle
* pyxdg
### Note
There are a "\<change_me\>" strings and files that need to be set according to your app's name located at:
* \_\_builtins\_\_.py
* user_config/bin/app_name
* user_config/usr/share/app_name
* user_config/usr/share/app_name/icons/app_name.png
* user_config/usr/share/app_name/icons/app_name-64x64.png
* user_config/usr/share/applications/app_name.desktop
# Notes
<b>Still Work in progress! Use at own risk!</b>
For the user_config, after changing names and files, copy all content to their respective destinations.
The logic follows Debian Dpkg packaging and its placement logic.
# Images
![1 Pulstar with sound sinks. ](images/pic1.png)

BIN
images/pic1.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 338 KiB

View File

@ -1,2 +0,0 @@
### Note
Copy the example and rename it to your desired name. The Main class and passed in arguments are required. You don't necessarily need to use the passed in socket_id or event_system.

View File

@ -1,3 +0,0 @@
"""
Pligin Module
"""

View File

@ -1,3 +0,0 @@
"""
Pligin Package
"""

View File

@ -1,13 +0,0 @@
{
"manifest": {
"name": "Example Plugin",
"author": "John Doe",
"version": "0.0.1",
"support": "",
"requests": {
"ui_target": "plugin_control_list",
"pass_fm_events": "true",
"bind_keys": ["Example Plugin||send_message:<Control>f"]
}
}
}

View File

@ -1,51 +0,0 @@
# Python imports
import os
import threading
import subprocess
import time
# Lib imports
import gi
gi.require_version('Gtk', '3.0')
from gi.repository import Gtk
# Application imports
from plugins.plugin_base import PluginBase
# NOTE: Threads WILL NOT die with parent's destruction.
def threaded(fn):
def wrapper(*args, **kwargs):
threading.Thread(target=fn, args=args, kwargs=kwargs, daemon=False).start()
return wrapper
# NOTE: Threads WILL die with parent's destruction.
def daemon_threaded(fn):
def wrapper(*args, **kwargs):
threading.Thread(target=fn, args=args, kwargs=kwargs, daemon=True).start()
return wrapper
class Plugin(PluginBase):
def __init__(self):
super().__init__()
self.name = "Example Plugin" # NOTE: Need to remove after establishing private bidirectional 1-1 message bus
# where self.name should not be needed for message comms
def generate_reference_ui_element(self):
button = Gtk.Button(label=self.name)
button.connect("button-release-event", self.send_message)
return button
def run(self):
...
def send_message(self, widget=None, eve=None):
message = "Hello, World!"
event_system.emit("display_message", ("warning", message, None))

View File

@ -29,7 +29,7 @@ def daemon_threaded_wrapper(fn):
# NOTE: Just reminding myself we can add to builtins two different ways...
# __builtins__.update({"event_system": Builtins()})
builtins.app_name = "<change_me>"
builtins.app_name = "Pulstar"
builtins.keybindings = Keybindings()
builtins.event_system = EventSystem()
builtins.endpoint_registry = EndpointRegistry()

View File

@ -4,19 +4,28 @@
import gi
gi.require_version('Gtk', '3.0')
from gi.repository import Gtk
from gi.repository import GLib
# Application imports
from utils.pulsectl import pulsectl
from ..widgets.audio_sink import AudioSink
class BaseContainer(Gtk.Box):
def __init__(self):
super(BaseContainer, self).__init__()
self._builder = settings.get_builder()
self._scroll = None
self._box = None
self.pulse = None
self.pulse_events = None
self.sink_inputs = None
self.pause_sink = False
self._setup_styling()
self._setup_signals()
self._subscribe_to_events()
self._load_widgets()
self.show_all()
@ -24,20 +33,97 @@ class BaseContainer(Gtk.Box):
def _setup_styling(self):
self.set_orientation(Gtk.Orientation.VERTICAL)
self.set_margin_top(10)
self.set_margin_bottom(10)
self.set_margin_left(10)
self.set_margin_right(10)
def _setup_signals(self):
...
def _subscribe_to_events(self):
event_system.subscribe("handle_new_sync_input", self._handle_new_sync_input)
event_system.subscribe("handle_del_sync_input", self._handle_del_sync_input)
event_system.subscribe("handle_cng_sync_input", self._handle_cng_sync_input)
def _load_widgets(self):
glade_box = self._builder.get_object("glade_box")
button = Gtk.Button(label="Click Me!")
scroll = Gtk.ScrolledWindow()
viewport = Gtk.Viewport()
box = Gtk.Box()
scroll.add(viewport)
viewport.add(box)
button.connect("clicked", self._hello_world)
try:
pulse = pulsectl.Pulse()
self.pulse_events = pulsectl.Pulse('event-printer')
si, sink_inputs, modules, clients, sink_list = pulse.server_info(), pulse.sink_input_list(), pulse.module_list(), pulse.client_list(), pulse.sink_list()
self.add(button)
self.add(glade_box)
logger.debug(f"\n\nServer Info\n{si}\n\nSink Inputs:")
for sink in sink_list:
self.add( AudioSink(pulse, sink) )
self.pulse = pulse
self.sink_inputs = sink_inputs
for sink_input in sink_inputs:
box.add( AudioSink(pulse, sink_input) )
except Exception as e:
logger.debug(f"{e}")
self._box = box
self._scroll = scroll.get_vadjustment()
def _hello_world(self, widget=None, eve=None):
logger.debug("Hello, World!")
self._scroll.connect("changed", self._scroll_to_bottom)
scroll.set_vexpand(True)
scroll.set_overlay_scrolling(False)
scroll.set_margin_top(20)
scroll.set_margin_bottom(10)
box.set_orientation(Gtk.Orientation.VERTICAL)
self.add(scroll)
self.set_pulse_event_listener()
def _handle_new_sync_input(self, index):
sink_input = self.pulse.sink_input_list()[-1]
self._box.add( AudioSink(self.pulse, sink_input) )
def _handle_del_sync_input(self, index):
for sink_input in self.sink_inputs:
if index == sink_input.index:
self.sink_inputs.remove(sink_input)
for child in self._box.get_children():
if index == child.sink.index:
child.destroy()
def _handle_cng_sync_input(self, index):
for child in self._box.get_children():
if index == child.sink.index:
child.do_update()
def _scroll_to_bottom(self, adjustment):
self._scroll.set_value( adjustment.get_upper() )
@daemon_threaded
def set_pulse_event_listener(self):
try:
self.pulse_events.event_mask_set('all')
self.pulse_events.event_callback_set(self._sub_threaded_event)
self.pulse_events.event_listen(timeout = 0)
except Exception as e:
logger.debug(f"{e}")
def _sub_threaded_event(self, eve):
logger.debug(f"Pulse event: {eve}")
GLib.idle_add(self.handle_event, *(eve,))
def handle_event(self, eve):
if eve.facility == "sink_input":
if eve.t == "change":
event_system.emit("handle_cng_sync_input", (eve.index,))
if eve.t == "new":
event_system.emit("handle_new_sync_input", (eve.index,))
if eve.t == "remove":
event_system.emit("handle_del_sync_input", (eve.index,))

View File

@ -4,20 +4,15 @@ import os
# Lib imports
import gi
gi.require_version('Gtk', '3.0')
gi.require_version('Gdk', '3.0')
from gi.repository import Gtk
from gi.repository import Gdk
from gi.repository import GLib
# Application imports
from .mixins.signals_mixins import SignalsMixins
from .mixins.dummy_mixin import DummyMixin
from .controller_data import ControllerData
from .containers.base_container import BaseContainer
class Controller(DummyMixin, SignalsMixins, ControllerData):
class Controller(ControllerData):
def __init__(self, args, unknownargs):
self.setup_controller_data()
@ -25,34 +20,14 @@ class Controller(DummyMixin, SignalsMixins, ControllerData):
self._setup_signals()
self._subscribe_to_events()
self.print_hello_world() # A mixin method from the DummyMixin file
if args.no_plugins == "false":
self.plugins.launch_plugins()
for arg in unknownargs + [args.new_tab,]:
if os.path.isfile(arg):
message = f"FILE|{arg}"
event_system.emit("post_file_to_ipc", message)
if os.path.isdir(arg):
message = f"DIR|{arg}"
event_system.emit("post_file_to_ipc", message)
logger.info(f"Made it past {self.__class__} loading...")
def _setup_styling(self):
...
def _setup_signals(self):
self.window.connect("focus-out-event", self.unset_keys_and_data)
self.window.connect("key-press-event", self.on_global_key_press_controller)
self.window.connect("key-release-event", self.on_global_key_release_controller)
...
def _subscribe_to_events(self):
event_system.subscribe("handle_file_from_ipc", self.handle_file_from_ipc)
event_system.subscribe("handle_dir_from_ipc", self.handle_dir_from_ipc)
event_system.subscribe("tggl_top_main_menubar", self._tggl_top_main_menubar)
def _tggl_top_main_menubar(self):
@ -60,9 +35,6 @@ class Controller(DummyMixin, SignalsMixins, ControllerData):
def setup_builder_and_container(self):
self.builder = Gtk.Builder()
self.builder.add_from_file(settings.get_glade_file())
self.builder.expose_object("main_window", self.window)
settings.set_builder(self.builder)
self.base_container = BaseContainer()

View File

@ -5,7 +5,6 @@ import subprocess
# Lib imports
# Application imports
from plugins.plugins_controller import PluginsController
@ -22,7 +21,6 @@ class ControllerData:
self.alt_down = False
self.setup_builder_and_container()
self.plugins = PluginsController()
def clear_console(self) -> None:

View File

@ -1,3 +0,0 @@
"""
Generic Mixins Module
"""

View File

@ -1,13 +0,0 @@
# Python imports
# Lib imports
# Application imports
class DummyMixin:
""" DummyMixin is an example of how mixins are used and structured in a project. """
def print_hello_world(self) -> None:
logger.debug("Hello, World!")

View File

@ -1,3 +0,0 @@
"""
Signals module
"""

View File

@ -1,20 +0,0 @@
# Python imports
# Lib imports
# Application imports
class IPCSignalsMixin:
""" IPCSignalsMixin handle messages from another starting solarfm process. """
def print_to_console(self, message=None):
logger.debug(message)
def handle_file_from_ipc(self, path: str) -> None:
logger.debug(f"File From IPC: {path}")
def handle_dir_from_ipc(self, path: str) -> None:
logger.debug(f"Dir From IPC: {path}")

View File

@ -1,94 +0,0 @@
# Python imports
import re
# Lib imports
import gi
gi.require_version('Gtk', '3.0')
gi.require_version('Gdk', '3.0')
from gi.repository import Gtk
from gi.repository import Gdk
# Application imports
valid_keyvalue_pat = re.compile(r"[a-z0-9A-Z-_\[\]\(\)\| ]")
class KeyboardSignalsMixin:
""" KeyboardSignalsMixin keyboard hooks controller. """
# TODO: Need to set methods that use this to somehow check the keybindings state instead.
def unset_keys_and_data(self, widget=None, eve=None):
self.ctrl_down = False
self.shift_down = False
self.alt_down = False
def on_global_key_press_controller(self, eve, user_data):
keyname = Gdk.keyval_name(user_data.keyval).lower()
modifiers = Gdk.ModifierType(user_data.get_state() & ~Gdk.ModifierType.LOCK_MASK)
self.was_midified_key = True if modifiers != 0 else False
if keyname.replace("_l", "").replace("_r", "") in ["control", "alt", "shift"]:
if "control" in keyname:
self.ctrl_down = True
if "shift" in keyname:
self.shift_down = True
if "alt" in keyname:
self.alt_down = True
def on_global_key_release_controller(self, widget, event):
""" Handler for keyboard events """
keyname = Gdk.keyval_name(event.keyval).lower()
modifiers = Gdk.ModifierType(event.get_state() & ~Gdk.ModifierType.LOCK_MASK)
if keyname.replace("_l", "").replace("_r", "") in ["control", "alt", "shift"]:
should_return = self.was_midified_key and (self.ctrl_down or self.shift_down or self.alt_down)
if "control" in keyname:
self.ctrl_down = False
if "shift" in keyname:
self.shift_down = False
if "alt" in keyname:
self.alt_down = False
# NOTE: In effect a filter after releasing a modifier and we have a modifier mapped
if should_return:
self.was_midified_key = False
return
mapping = keybindings.lookup(event)
logger.debug(f"on_global_key_release_controller > key > {keyname}")
logger.debug(f"on_global_key_release_controller > keyval > {event.keyval}")
logger.debug(f"on_global_key_release_controller > mapping > {mapping}")
if mapping:
# See if in controller scope
try:
getattr(self, mapping)()
return True
except Exception:
# Must be plugins scope, event call, OR we forgot to add method to controller scope
if "||" in mapping:
sender, eve_type = mapping.split("||")
else:
sender = ""
eve_type = mapping
self.handle_key_event_system(sender, eve_type)
else:
logger.debug(f"on_global_key_release_controller > key > {keyname}")
if self.ctrl_down:
if not keyname in ["1", "kp_1", "2", "kp_2", "3", "kp_3", "4", "kp_4"]:
self.handle_key_event_system(None, mapping)
else:
...
def handle_key_event_system(self, sender, eve_type):
event_system.emit(eve_type)
def keyboard_close_tab(self):
...

View File

@ -1,13 +0,0 @@
# Python imports
# Lib imports
from .signals.ipc_signals_mixin import IPCSignalsMixin
from .signals.keyboard_signals_mixin import KeyboardSignalsMixin
# Application imports
class SignalsMixins(KeyboardSignalsMixin, IPCSignalsMixin):
...

View File

@ -0,0 +1,120 @@
# Python imports
# Lib imports
import gi
gi.require_version('Gtk', '3.0')
from gi.repository import Gtk
# Application imports
class AudioSink(Gtk.Box):
"""docstring for AudioSink."""
def __init__(self, pulse, sink):
super(AudioSink, self).__init__()
self._scale = None
self._mute_btn = None
self.pulse = pulse
self.sink = sink
self.min_range = 0
self.max_range = 150
self.muted = False
self.block_update = False
self._setup_styling()
self._setup_signals()
self._load_widgets()
self.show_all()
def _setup_styling(self):
self.set_orientation(Gtk.Orientation.VERTICAL)
self.set_margin_top(10)
self.set_margin_bottom(10)
def _setup_signals(self):
...
def _load_widgets(self):
current_lvl = self.sink.volume.values[0] * 100
box = Gtk.Box()
label = Gtk.Label()
scale = Gtk.Scale.new_with_range(Gtk.Orientation.HORIZONTAL, self.min_range, self.max_range, 1)
mute_btn = Gtk.Button()
self.muted = False if self.sink.mute == 0 else True
# logger.debug(f"{self.sink.proplist}")
try:
name = self.sink.description
except Exception as e:
name = self.sink.proplist["application.name"]
label.set_text(name)
label.set_xalign(0.0)
scale.add_mark(0.0, Gtk.PositionType.BOTTOM, "Silence")
scale.add_mark(50.0, Gtk.PositionType.BOTTOM, "50%")
scale.add_mark(100.0, Gtk.PositionType.BOTTOM, "100%")
scale.set_value(current_lvl)
scale.set_hexpand(True)
box.set_orientation(Gtk.Orientation.HORIZONTAL)
mute_btn.set_margin_left(10)
self.set_mute_image(mute_btn)
scale.connect("value-changed", self.set_volume)
mute_btn.connect("clicked", self.toggle_mute)
mute_btn.set_tooltip_text("Mute...")
self._scale = scale
self._mute_btn = mute_btn
self.add(label)
box.add(scale)
box.add(mute_btn)
self.add(box)
def set_volume(self, range):
if self.block_update:
return
value = range.get_value() / 100
sink_vol_info = self.sink.volume
new_level = []
for level in sink_vol_info.values:
new_level.append(value)
sink_vol_info.values = new_level
self.pulse.sink_input_volume_set(self.sink.index, sink_vol_info)
def toggle_mute(self, widget = None, eve = None):
self.muted = not self.muted
self.set_mute_image(widget)
self.pulse.sink_input_mute(self.sink.index, mute=self.muted)
def set_mute_image(self, mute_btn):
if self.muted:
mute_btn.set_image( Gtk.Image.new_from_icon_name("gtk-disconnect", 3) )
else:
mute_btn.set_image( Gtk.Image.new_from_icon_name("gtk-connect", 3) )
def do_update(self):
self.block_update = True
sink_inputs = self.pulse.sink_input_list()
for sink_input in sink_inputs:
if sink_input.index == self.sink.index:
self.sink = sink_input
sink_vol_info = self.sink.volume.values
self.muted = False if self.sink.mute == 0 else True
self.set_mute_image(self._mute_btn)
self._scale.set_value( sink_vol_info[0] * 100 )
self.block_update = False

View File

@ -1,3 +0,0 @@
"""
Gtk Bound Plugins Module
"""

View File

@ -1,64 +0,0 @@
# Python imports
import os
import json
from os.path import join
# Lib imports
# Application imports
class ManifestProcessor(Exception):
...
class Plugin:
path: str = None
name: str = None
author: str = None
version: str = None
support: str = None
requests:{} = None
reference: type = None
class ManifestProcessor:
def __init__(self, path, builder):
manifest = join(path, "manifest.json")
if not os.path.exists(manifest):
raise Exception("Invalid Plugin Structure: Plugin doesn't have 'manifest.json'. Aboarting load...")
self._path = path
self._builder = builder
with open(manifest) as f:
data = json.load(f)
self._manifest = data["manifest"]
self._plugin = self.collect_info()
def collect_info(self) -> Plugin:
plugin = Plugin()
plugin.path = self._path
plugin.name = self._manifest["name"]
plugin.author = self._manifest["author"]
plugin.version = self._manifest["version"]
plugin.support = self._manifest["support"]
plugin.requests = self._manifest["requests"]
return plugin
def get_loading_data(self):
loading_data = {}
requests = self._plugin.requests
keys = requests.keys()
if "pass_events" in keys:
if requests["pass_events"] in ["true"]:
loading_data["pass_events"] = True
if "bind_keys" in keys:
if isinstance(requests["bind_keys"], list):
loading_data["bind_keys"] = requests["bind_keys"]
return self._plugin, loading_data

View File

@ -1,61 +0,0 @@
# Python imports
import os
import time
# Lib imports
# Application imports
class PluginBaseException(Exception):
...
class PluginBase:
def __init__(self):
self.name = "Example Plugin" # NOTE: Need to remove after establishing private bidirectional 1-1 message bus
# where self.name should not be needed for message comms
self._builder = None
self._ui_objects = None
self._event_system = None
def run(self):
"""
Must define regardless if needed and can 'pass' if plugin doesn't need it.
Is intended to be used to setup internal signals or custom Gtk Builders/UI logic.
"""
raise PluginBaseException("Method hasn't been overriden...")
def generate_reference_ui_element(self):
"""
Requests Key: 'ui_target': "plugin_control_list",
Must define regardless if needed and can 'pass' if plugin doesn't use it.
Must return a widget if "ui_target" is set.
"""
raise PluginBaseException("Method hasn't been overriden...")
def set_event_system(self, event_system):
"""
Requests Key: 'pass_events': "true"
Must define in plugin if "pass_events" is set to "true" string.
"""
self._event_system = event_system
def set_ui_object_collection(self, ui_objects):
"""
Requests Key: "pass_ui_objects": [""]
Request reference to a UI component. Will be passed back as array to plugin.
Must define in plugin if set and an array of valid glade UI IDs is given.
"""
self._ui_objects = ui_objects
def subscribe_to_events(self):
...
def clear_children(self, widget: type) -> None:
""" Clear children of a gtk widget. """
for child in widget.get_children():
widget.remove(child)

View File

@ -1,118 +0,0 @@
# Python imports
import os
import sys
import importlib
import traceback
from os.path import join
from os.path import isdir
# Lib imports
import gi
gi.require_version('Gtk', '3.0')
from gi.repository import Gtk
from gi.repository import Gio
# Application imports
from .manifest import Plugin
from .manifest import ManifestProcessor
class InvalidPluginException(Exception):
...
class PluginsController:
"""PluginsController controller"""
def __init__(self):
path = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, path) # NOTE: I think I'm not using this correctly...
self._builder = settings.get_builder()
self._plugins_path = settings.get_plugins_path()
self._plugins_dir_watcher = None
self._plugin_collection = []
def launch_plugins(self) -> None:
self._set_plugins_watcher()
self.load_plugins()
def _set_plugins_watcher(self) -> None:
self._plugins_dir_watcher = Gio.File.new_for_path(self._plugins_path) \
.monitor_directory(Gio.FileMonitorFlags.WATCH_MOVES, Gio.Cancellable())
self._plugins_dir_watcher.connect("changed", self._on_plugins_changed, ())
def _on_plugins_changed(self, file_monitor, file, other_file=None, eve_type=None, data=None):
if eve_type in [Gio.FileMonitorEvent.CREATED, Gio.FileMonitorEvent.DELETED,
Gio.FileMonitorEvent.RENAMED, Gio.FileMonitorEvent.MOVED_IN,
Gio.FileMonitorEvent.MOVED_OUT]:
self.reload_plugins(file)
def load_plugins(self, file: str = None) -> None:
logger.debug(f"Loading plugins...")
parent_path = os.getcwd()
for path, folder in [[join(self._plugins_path, item), item] if os.path.isdir(join(self._plugins_path, item)) else None for item in os.listdir(self._plugins_path)]:
try:
target = join(path, "plugin.py")
manifest = ManifestProcessor(path, self._builder)
if not os.path.exists(target):
raise InvalidPluginException("Invalid Plugin Structure: Plugin doesn't have 'plugin.py'. Aboarting load...")
plugin, loading_data = manifest.get_loading_data()
module = self.load_plugin_module(path, folder, target)
self.execute_plugin(module, plugin, loading_data)
except Exception as e:
logger.debug(f"Malformed Plugin: Not loading -->: '{folder}' !\n{traceback.print_exc()}")
os.chdir(parent_path)
def load_plugin_module(self, path, folder, target):
os.chdir(path)
locations = []
self.collect_search_locations(path, locations)
spec = importlib.util.spec_from_file_location(folder, target, submodule_search_locations = locations)
module = importlib.util.module_from_spec(spec)
sys.modules[folder] = module
spec.loader.exec_module(module)
return module
def collect_search_locations(self, path, locations):
locations.append(path)
for file in os.listdir(path):
_path = os.path.join(path, file)
if os.path.isdir(_path):
self.collect_search_locations(_path, locations)
def execute_plugin(self, module: type, plugin: Plugin, loading_data: []):
plugin.reference = module.Plugin()
keys = loading_data.keys()
if "ui_target" in keys:
loading_data["ui_target"].add( plugin.reference.generate_reference_ui_element() )
loading_data["ui_target"].show_all()
if "pass_ui_objects" in keys:
plugin.reference.set_ui_object_collection( loading_data["pass_ui_objects"] )
if "pass_events" in keys:
plugin.reference.set_fm_event_system(event_system)
plugin.reference.subscribe_to_events()
if "bind_keys" in keys:
keybindings.append_bindings( loading_data["bind_keys"] )
plugin.reference.run()
self._plugin_collection.append(plugin)
def reload_plugins(self, file: str = None) -> None:
logger.debug(f"Reloading plugins... stub.")

View File

@ -19,12 +19,29 @@ def debug_signal_handler(signal, frame):
rpdb2.setbreak(depth=1)
return
except StandardError:
pass
...
try:
from rfoo.utils import rconsole
logger.debug("\n\nStarting embedded rconsole debugger...\n\n")
rconsole.spawn_server()
return
except StandardError as ex:
...
try:
from pudb import set_trace
logger.debug("\n\nStarting PuDB debugger...\n\n")
set_trace(paused = True)
return
except StandardError as ex:
...
try:
import pdb
logger.debug("\n\nStarting embedded PDB debugger...\n\n")
pdb.Pdb(skip=['gi.*']).set_trace()
return
except StandardError as ex:
...

View File

@ -0,0 +1,16 @@
# -*- coding: utf-8 -*-
from __future__ import print_function
from . import _pulsectl
from .pulsectl import (
PulsePortInfo, PulseClientInfo, PulseServerInfo, PulseModuleInfo,
PulseSinkInfo, PulseSinkInputInfo, PulseSourceInfo, PulseSourceOutputInfo,
PulseCardProfileInfo, PulseCardPortInfo, PulseCardInfo, PulseVolumeInfo,
PulseExtStreamRestoreInfo, PulseEventInfo,
PulseEventTypeEnum, PulseEventFacilityEnum, PulseEventMaskEnum,
PulseStateEnum, PulseUpdateEnum, PulsePortAvailableEnum, PulseDirectionEnum,
PulseError, PulseIndexError, PulseOperationFailed, PulseOperationInvalid,
PulseLoopStop, PulseDisconnected, PulseObject, Pulse, connect_to_cli )

View File

@ -0,0 +1,688 @@
# -*- coding: utf-8 -*-
from __future__ import print_function
# C Bindings
import os, sys, ctypes.util, functools as ft
from ctypes import *
force_str = lambda s, errors='strict': s.decode('utf-8', errors) if isinstance(s, bytes) else s
force_bytes = lambda s, errors='strict': s.encode('utf-8', errors) if isinstance(s, unicode) else s
if sys.version_info.major >= 3:
class c_str_p_type(object):
c_type = c_char_p
def __call__(self, val): return force_str(val)
def from_param(self, val):
# int will be interpreted as pointer and segfault in py3
if isinstance(val, int): raise ArgumentError(type(val))
return force_bytes(val)
unicode, c_str_p = str, c_str_p_type()
import time
mono_time = time.monotonic
else:
c_str_p = c_char_p
def mono_time():
if not hasattr(mono_time, 'ts'):
class timespec(Structure):
_fields_ = [('tv_sec', c_long), ('tv_nsec', c_long)]
librt = CDLL('librt.so.1', use_errno=True)
mono_time.get = librt.clock_gettime
mono_time.get.argtypes = [c_int, POINTER(timespec)]
mono_time.ts = timespec
ts = mono_time.ts()
if mono_time.get(4, pointer(ts)) != 0:
err = get_errno()
raise OSError(err, os.strerror(err))
return ts.tv_sec + ts.tv_nsec * 1e-9
PA_INVALID = 2**32-1
PA_VOLUME_NORM = 0x10000
PA_VOLUME_MAX = (2**32-1) // 2 # was different before pulseaudio-1.0, see 179b291b there
PA_VOLUME_INVALID = 2**32-1
pa_sw_volume_from_dB = lambda db:\
min(PA_VOLUME_MAX, int(round(((10.0 ** (db / 20.0)) ** (1/3)) * PA_VOLUME_NORM)))
PA_VOLUME_UI_MAX = 99957 # pa_sw_volume_from_dB(+11.0)
PA_CHANNELS_MAX = 32
PA_USEC_T = c_uint64
PA_CONTEXT_NOAUTOSPAWN = 0x0001
PA_CONTEXT_NOFAIL = 0x0002
PA_CONTEXT_UNCONNECTED = 0
PA_CONTEXT_CONNECTING = 1
PA_CONTEXT_AUTHORIZING = 2
PA_CONTEXT_SETTING_NAME = 3
PA_CONTEXT_READY = 4
PA_CONTEXT_FAILED = 5
PA_CONTEXT_TERMINATED = 6
PA_SUBSCRIPTION_MASK_NULL = 0x0000
PA_SUBSCRIPTION_MASK_SINK = 0x0001
PA_SUBSCRIPTION_MASK_SOURCE = 0x0002
PA_SUBSCRIPTION_MASK_SINK_INPUT = 0x0004
PA_SUBSCRIPTION_MASK_SOURCE_OUTPUT = 0x0008
PA_SUBSCRIPTION_MASK_MODULE = 0x0010
PA_SUBSCRIPTION_MASK_CLIENT = 0x0020
PA_SUBSCRIPTION_MASK_SAMPLE_CACHE = 0x0040
PA_SUBSCRIPTION_MASK_SERVER = 0x0080
PA_SUBSCRIPTION_MASK_AUTOLOAD = 0x0100
PA_SUBSCRIPTION_MASK_CARD = 0x0200
PA_SUBSCRIPTION_MASK_ALL = 0x02ff
PA_SUBSCRIPTION_EVENT_SINK = 0x0000
PA_SUBSCRIPTION_EVENT_SOURCE = 0x0001
PA_SUBSCRIPTION_EVENT_SINK_INPUT = 0x0002
PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT = 0x0003
PA_SUBSCRIPTION_EVENT_MODULE = 0x0004
PA_SUBSCRIPTION_EVENT_CLIENT = 0x0005
PA_SUBSCRIPTION_EVENT_SAMPLE_CACHE = 0x0006
PA_SUBSCRIPTION_EVENT_SERVER = 0x0007
PA_SUBSCRIPTION_EVENT_AUTOLOAD = 0x0008
PA_SUBSCRIPTION_EVENT_CARD = 0x0009
PA_SUBSCRIPTION_EVENT_FACILITY_MASK = 0x000F
PA_SUBSCRIPTION_EVENT_NEW = 0x0000
PA_SUBSCRIPTION_EVENT_CHANGE = 0x0010
PA_SUBSCRIPTION_EVENT_REMOVE = 0x0020
PA_SUBSCRIPTION_EVENT_TYPE_MASK = 0x0030
PA_SAMPLE_FLOAT32LE = 5
PA_SAMPLE_FLOAT32BE = 6
PA_SAMPLE_FLOAT32NE = dict(
little=PA_SAMPLE_FLOAT32LE,
big=PA_SAMPLE_FLOAT32BE )[sys.byteorder]
PA_STREAM_DONT_MOVE = 0x0200
PA_STREAM_PEAK_DETECT = 0x0800
PA_STREAM_ADJUST_LATENCY = 0x2000
PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND = 0x8000
def c_enum_map(**values):
return dict((v, force_str(k)) for k,v in values.items())
_globals = globals().copy()
_pa_ev_type = dict(
(force_str(k), _globals['PA_SUBSCRIPTION_EVENT_{}'.format(k.upper())])
for k in 'new change remove'.split() )
_pa_ev_fac, _pa_ev_mask = dict(), dict()
for k, n in _globals.items():
if k.startswith('PA_SUBSCRIPTION_EVENT_'):
if k.endswith('_MASK'): continue
k = force_str(k[22:].lower())
if k in _pa_ev_type: continue
assert n & PA_SUBSCRIPTION_EVENT_FACILITY_MASK == n, [k, n]
_pa_ev_fac[k] = n
elif k.startswith('PA_SUBSCRIPTION_MASK_'):
_pa_ev_mask[force_str(k[21:].lower())] = n
PA_EVENT_TYPE_MAP = c_enum_map(**_pa_ev_type)
PA_EVENT_FACILITY_MAP = c_enum_map(**_pa_ev_fac)
PA_EVENT_MASK_MAP = c_enum_map(**_pa_ev_mask)
del _globals, _pa_ev_type, _pa_ev_fac, _pa_ev_mask
PA_UPDATE_MAP = c_enum_map(set=0, merge=1, replace=2)
PA_PORT_AVAILABLE_MAP = c_enum_map(unknown=0, no=1, yes=2)
PA_DIRECTION_MAP = c_enum_map(unknown=0, output=1, input=2)
# These are defined separately as
# pa_sink_state / pa_source_state, but seem to match.
PA_OBJ_STATE_MAP = c_enum_map(invalid=-1, running=0, idle=1, suspended=2)
class PA_MAINLOOP(Structure): pass
class PA_STREAM(Structure): pass
class PA_MAINLOOP_API(Structure): pass
class PA_CONTEXT(Structure): pass
class PA_PROPLIST(Structure): pass
class PA_OPERATION(Structure): pass
class PA_SIGNAL_EVENT(Structure): pass
class PA_IO_EVENT(Structure): pass
class PA_SAMPLE_SPEC(Structure):
_fields_ = [
('format', c_int),
('rate', c_uint32),
('channels', c_uint32)
]
class PA_CHANNEL_MAP(Structure):
_fields_ = [
('channels', c_uint8),
('map', c_int * PA_CHANNELS_MAX)
]
class PA_CVOLUME(Structure):
_fields_ = [
('channels', c_uint8),
('values', c_uint32 * PA_CHANNELS_MAX)
]
class PA_PORT_INFO(Structure):
_fields_ = [
('name', c_char_p),
('description', c_char_p),
('priority', c_uint32),
('available', c_int),
]
class PA_SINK_INPUT_INFO(Structure):
_fields_ = [
('index', c_uint32),
('name', c_char_p),
('owner_module', c_uint32),
('client', c_uint32),
('sink', c_uint32),
('sample_spec', PA_SAMPLE_SPEC),
('channel_map', PA_CHANNEL_MAP),
('volume', PA_CVOLUME),
('buffer_usec', PA_USEC_T),
('sink_usec', PA_USEC_T),
('resample_method', c_char_p),
('driver', c_char_p),
('mute', c_int),
('proplist', POINTER(PA_PROPLIST)),
('corked', c_int),
('has_volume', c_int),
('volume_writable', c_int),
]
class PA_SINK_INFO(Structure):
_fields_ = [
('name', c_char_p),
('index', c_uint32),
('description', c_char_p),
('sample_spec', PA_SAMPLE_SPEC),
('channel_map', PA_CHANNEL_MAP),
('owner_module', c_uint32),
('volume', PA_CVOLUME),
('mute', c_int),
('monitor_source', c_uint32),
('monitor_source_name', c_char_p),
('latency', PA_USEC_T),
('driver', c_char_p),
('flags', c_int),
('proplist', POINTER(PA_PROPLIST)),
('configured_latency', PA_USEC_T),
('base_volume', c_uint32),
('state', c_int),
('n_volume_steps', c_int),
('card', c_uint32),
('n_ports', c_uint32),
('ports', POINTER(POINTER(PA_PORT_INFO))),
('active_port', POINTER(PA_PORT_INFO)),
]
class PA_SOURCE_OUTPUT_INFO(Structure):
_fields_ = [
('index', c_uint32),
('name', c_char_p),
('owner_module', c_uint32),
('client', c_uint32),
('source', c_uint32),
('sample_spec', PA_SAMPLE_SPEC),
('channel_map', PA_CHANNEL_MAP),
('buffer_usec', PA_USEC_T),
('source_usec', PA_USEC_T),
('resample_method', c_char_p),
('driver', c_char_p),
('proplist', POINTER(PA_PROPLIST)),
('corked', c_int),
('volume', PA_CVOLUME),
('mute', c_int),
('has_volume', c_int),
('volume_writable', c_int),
]
class PA_SOURCE_INFO(Structure):
_fields_ = [
('name', c_char_p),
('index', c_uint32),
('description', c_char_p),
('sample_spec', PA_SAMPLE_SPEC),
('channel_map', PA_CHANNEL_MAP),
('owner_module', c_uint32),
('volume', PA_CVOLUME),
('mute', c_int),
('monitor_of_sink', c_uint32),
('monitor_of_sink_name', c_char_p),
('latency', PA_USEC_T),
('driver', c_char_p),
('flags', c_int),
('proplist', POINTER(PA_PROPLIST)),
('configured_latency', PA_USEC_T),
('base_volume', c_uint32),
('state', c_int),
('n_volume_steps', c_int),
('card', c_uint32),
('n_ports', c_uint32),
('ports', POINTER(POINTER(PA_PORT_INFO))),
('active_port', POINTER(PA_PORT_INFO)),
]
class PA_CLIENT_INFO(Structure):
_fields_ = [
('index', c_uint32),
('name', c_char_p),
('owner_module', c_uint32),
('driver', c_char_p),
('proplist', POINTER(PA_PROPLIST)),
]
class PA_SERVER_INFO(Structure):
_fields_ = [
('user_name', c_char_p),
('host_name', c_char_p),
('server_version', c_char_p),
('server_name', c_char_p),
('sample_spec', PA_SAMPLE_SPEC),
('default_sink_name', c_char_p),
('default_source_name', c_char_p),
('cookie', c_uint32),
('channel_map', PA_CHANNEL_MAP),
]
class PA_CARD_PROFILE_INFO(Structure):
_fields_ = [
('name', c_char_p),
('description', c_char_p),
('n_sinks', c_uint32),
('n_sources', c_uint32),
('priority', c_uint32),
('available', c_int),
]
# Extends PA_PORT_INFO with a few card-specific things
class PA_CARD_PORT_INFO(Structure):
_fields_ = [
('name', c_char_p),
('description', c_char_p),
('priority', c_uint32),
('available', c_int),
('direction', c_int),
('n_profiles', c_uint32),
('profiles', c_void_p), # use profiles2
('proplist', POINTER(PA_PROPLIST)),
('latency_offset', c_int64),
('profiles2', POINTER(POINTER(PA_CARD_PROFILE_INFO))),
]
class PA_CARD_INFO(Structure):
_fields_ = [
('index', c_uint32),
('name', c_char_p),
('owner_module', c_uint32),
('driver', c_char_p),
('n_profiles', c_uint32),
('profiles', c_void_p), # use profiles2 / active_profile2
('active_profile', c_void_p),
('proplist', POINTER(PA_PROPLIST)),
('n_ports', c_uint32),
('ports', POINTER(POINTER(PA_CARD_PORT_INFO))),
('profiles2', POINTER(POINTER(PA_CARD_PROFILE_INFO))),
('active_profile2', POINTER(PA_CARD_PROFILE_INFO)),
]
class PA_MODULE_INFO(Structure):
_fields_ = [
('index', c_uint32),
('name', c_char_p),
('argument', c_char_p),
('n_used', c_uint32),
('auto_unload', c_int),
('proplist', POINTER(PA_PROPLIST)),
]
class PA_EXT_STREAM_RESTORE_INFO(Structure):
_fields_ = [
('name', c_char_p),
('channel_map', PA_CHANNEL_MAP),
('volume', PA_CVOLUME),
('device', c_char_p),
('mute', c_int),
]
class PA_BUFFER_ATTR(Structure):
_fields_ = [
('maxlength', c_uint32),
('tlength', c_uint32),
('prebuf', c_uint32),
('minreq', c_uint32),
('fragsize', c_uint32),
]
class POLLFD(Structure):
_fields_ = [
('fd', c_int),
('events', c_short),
('revents', c_short),
]
PA_POLL_FUNC_T = CFUNCTYPE(c_int,
POINTER(POLLFD),
c_ulong,
c_int,
c_void_p)
PA_SIGNAL_CB_T = CFUNCTYPE(c_void_p,
POINTER(PA_MAINLOOP_API),
POINTER(c_int),
c_int,
c_void_p)
PA_STATE_CB_T = CFUNCTYPE(c_void_p,
POINTER(PA_CONTEXT),
c_void_p)
PA_CLIENT_INFO_CB_T = CFUNCTYPE(c_void_p,
POINTER(PA_CONTEXT),
POINTER(PA_CLIENT_INFO),
c_int,
c_void_p)
PA_SERVER_INFO_CB_T = CFUNCTYPE(c_void_p,
POINTER(PA_CONTEXT),
POINTER(PA_SERVER_INFO),
c_void_p)
PA_SINK_INPUT_INFO_CB_T = CFUNCTYPE(c_void_p,
POINTER(PA_CONTEXT),
POINTER(PA_SINK_INPUT_INFO),
c_int,
c_void_p)
PA_SINK_INFO_CB_T = CFUNCTYPE(c_void_p,
POINTER(PA_CONTEXT),
POINTER(PA_SINK_INFO),
c_int,
c_void_p)
PA_SOURCE_OUTPUT_INFO_CB_T = CFUNCTYPE(c_void_p,
POINTER(PA_CONTEXT),
POINTER(PA_SOURCE_OUTPUT_INFO),
c_int,
c_void_p)
PA_SOURCE_INFO_CB_T = CFUNCTYPE(c_void_p,
POINTER(PA_CONTEXT),
POINTER(PA_SOURCE_INFO),
c_int,
c_void_p)
PA_CONTEXT_DRAIN_CB_T = CFUNCTYPE(c_void_p,
POINTER(PA_CONTEXT),
c_void_p)
PA_CONTEXT_INDEX_CB_T = CFUNCTYPE(c_void_p,
POINTER(PA_CONTEXT),
c_uint32,
c_void_p)
PA_CONTEXT_SUCCESS_CB_T = CFUNCTYPE(c_void_p,
POINTER(PA_CONTEXT),
c_int,
c_void_p)
PA_EXT_STREAM_RESTORE_TEST_CB_T = CFUNCTYPE(c_void_p,
POINTER(PA_CONTEXT),
c_uint32,
c_void_p)
PA_EXT_STREAM_RESTORE_READ_CB_T = CFUNCTYPE(c_void_p,
POINTER(PA_CONTEXT),
POINTER(PA_EXT_STREAM_RESTORE_INFO),
c_int,
c_void_p)
PA_CARD_INFO_CB_T = CFUNCTYPE(c_void_p,
POINTER(PA_CONTEXT),
POINTER(PA_CARD_INFO),
c_int,
c_void_p)
PA_MODULE_INFO_CB_T = CFUNCTYPE(c_void_p,
POINTER(PA_CONTEXT),
POINTER(PA_MODULE_INFO),
c_int,
c_void_p)
PA_SUBSCRIBE_CB_T = CFUNCTYPE(c_void_p,
POINTER(PA_CONTEXT),
c_int,
c_int,
c_void_p)
PA_STREAM_REQUEST_CB_T = CFUNCTYPE(c_void_p,
POINTER(PA_STREAM),
c_int,
c_void_p)
PA_STREAM_NOTIFY_CB_T = CFUNCTYPE(c_void_p,
POINTER(PA_STREAM),
c_void_p)
class LibPulse(object):
# func_def ::= arg_types_list | (arg_types_list, res_spec) | (res_spec, arg_types_list)
# res_spec ::= ctypes_restype
# | res_proc_func | (ctypes_restype, res_proc_func)
# | res_spec_name_str | (ctypes_restype, res_spec_name_str)
# res_spec_name_str ::= 'int_check_ge0' | 'pa_op' | ...
func_defs = dict(
pa_strerror=([c_int], c_str_p),
pa_runtime_path=([c_str_p], (c_char_p, 'not_null')),
pa_operation_unref=[POINTER(PA_OPERATION)],
pa_mainloop_new=(POINTER(PA_MAINLOOP)),
pa_mainloop_get_api=([POINTER(PA_MAINLOOP)], POINTER(PA_MAINLOOP_API)),
pa_mainloop_run=([POINTER(PA_MAINLOOP), POINTER(c_int)], c_int),
pa_mainloop_prepare=([POINTER(PA_MAINLOOP), c_int], 'int_check_ge0'),
pa_mainloop_poll=([POINTER(PA_MAINLOOP)], 'int_check_ge0'),
pa_mainloop_dispatch=([POINTER(PA_MAINLOOP)], 'int_check_ge0'),
pa_mainloop_iterate=([POINTER(PA_MAINLOOP), c_int, POINTER(c_int)], 'int_check_ge0'),
pa_mainloop_wakeup=[POINTER(PA_MAINLOOP)],
pa_mainloop_set_poll_func=[POINTER(PA_MAINLOOP), PA_POLL_FUNC_T, c_void_p],
pa_mainloop_quit=([POINTER(PA_MAINLOOP), c_int]),
pa_mainloop_free=[POINTER(PA_MAINLOOP)],
pa_signal_init=([POINTER(PA_MAINLOOP_API)], 'int_check_ge0'),
pa_signal_new=([c_int, PA_SIGNAL_CB_T, POINTER(PA_SIGNAL_EVENT)]),
pa_signal_done=None,
pa_context_errno=([POINTER(PA_CONTEXT)], c_int),
pa_context_new=([POINTER(PA_MAINLOOP_API), c_str_p], POINTER(PA_CONTEXT)),
pa_context_set_state_callback=([POINTER(PA_CONTEXT), PA_STATE_CB_T, c_void_p]),
pa_context_connect=([POINTER(PA_CONTEXT), c_str_p, c_int, POINTER(c_int)], 'int_check_ge0'),
pa_context_get_state=([POINTER(PA_CONTEXT)], c_int),
pa_context_disconnect=[POINTER(PA_CONTEXT)],
pa_context_unref=[POINTER(PA_CONTEXT)],
pa_context_drain=( 'pa_op',
[POINTER(PA_CONTEXT), PA_CONTEXT_DRAIN_CB_T, c_void_p] ),
pa_context_set_default_sink=( 'pa_op',
[POINTER(PA_CONTEXT), c_str_p, PA_CONTEXT_SUCCESS_CB_T, c_void_p] ),
pa_context_set_default_source=( 'pa_op',
[POINTER(PA_CONTEXT), c_str_p, PA_CONTEXT_SUCCESS_CB_T, c_void_p] ),
pa_context_get_sink_input_info_list=( 'pa_op',
[POINTER(PA_CONTEXT), PA_SINK_INPUT_INFO_CB_T, c_void_p] ),
pa_context_get_sink_input_info=( 'pa_op',
[POINTER(PA_CONTEXT), c_uint32, PA_SINK_INPUT_INFO_CB_T, c_void_p] ),
pa_context_get_sink_info_list=( 'pa_op',
[POINTER(PA_CONTEXT), PA_SINK_INFO_CB_T, c_void_p] ),
pa_context_get_sink_info_by_name=( 'pa_op',
[POINTER(PA_CONTEXT), c_str_p, PA_SINK_INFO_CB_T, c_void_p] ),
pa_context_get_sink_info_by_index=( 'pa_op',
[POINTER(PA_CONTEXT), c_uint32, PA_SINK_INFO_CB_T, c_void_p] ),
pa_context_set_sink_mute_by_index=( 'pa_op',
[POINTER(PA_CONTEXT), c_uint32, c_int, PA_CONTEXT_SUCCESS_CB_T, c_void_p] ),
pa_context_suspend_sink_by_index=( 'pa_op',
[POINTER(PA_CONTEXT), c_uint32, c_int, PA_CONTEXT_SUCCESS_CB_T, c_void_p] ),
pa_context_set_sink_port_by_index=( 'pa_op',
[POINTER(PA_CONTEXT), c_uint32, c_str_p, PA_CONTEXT_SUCCESS_CB_T, c_void_p] ),
pa_context_set_sink_input_mute=( 'pa_op',
[POINTER(PA_CONTEXT), c_uint32, c_int, PA_CONTEXT_SUCCESS_CB_T, c_void_p] ),
pa_context_set_sink_volume_by_index=( 'pa_op',
[POINTER(PA_CONTEXT), c_uint32, POINTER(PA_CVOLUME), PA_CONTEXT_SUCCESS_CB_T, c_void_p] ),
pa_context_set_sink_input_volume=( 'pa_op',
[POINTER(PA_CONTEXT), c_uint32, POINTER(PA_CVOLUME), PA_CONTEXT_SUCCESS_CB_T, c_void_p] ),
pa_context_move_sink_input_by_index=( 'pa_op',
[POINTER(PA_CONTEXT), c_uint32, c_uint32, PA_CONTEXT_SUCCESS_CB_T, c_void_p] ),
pa_context_get_source_output_info=( 'pa_op',
[POINTER(PA_CONTEXT), c_uint32, PA_SOURCE_OUTPUT_INFO_CB_T, c_void_p] ),
pa_context_get_source_output_info_list=( 'pa_op',
[POINTER(PA_CONTEXT), PA_SOURCE_OUTPUT_INFO_CB_T, c_void_p] ),
pa_context_move_source_output_by_index=( 'pa_op',
[POINTER(PA_CONTEXT), c_uint32, c_uint32, PA_CONTEXT_SUCCESS_CB_T, c_void_p] ),
pa_context_set_source_output_volume=( 'pa_op',
[POINTER(PA_CONTEXT), c_uint32, POINTER(PA_CVOLUME), PA_CONTEXT_SUCCESS_CB_T, c_void_p] ),
pa_context_set_source_output_mute=( 'pa_op',
[POINTER(PA_CONTEXT), c_uint32, c_int, PA_CONTEXT_SUCCESS_CB_T, c_void_p] ),
pa_context_kill_source_output=( 'pa_op',
[POINTER(PA_CONTEXT), c_uint32, PA_CONTEXT_SUCCESS_CB_T, c_void_p] ),
pa_context_get_source_info_list=( 'pa_op',
[POINTER(PA_CONTEXT), PA_SOURCE_INFO_CB_T, c_void_p] ),
pa_context_get_source_info_by_name=( 'pa_op',
[POINTER(PA_CONTEXT), c_str_p, PA_SOURCE_INFO_CB_T, c_void_p] ),
pa_context_get_source_info_by_index=( 'pa_op',
[POINTER(PA_CONTEXT), c_uint32, PA_SOURCE_INFO_CB_T, c_void_p] ),
pa_context_set_source_volume_by_index=( 'pa_op',
[POINTER(PA_CONTEXT), c_uint32, POINTER(PA_CVOLUME), PA_CONTEXT_SUCCESS_CB_T, c_void_p] ),
pa_context_set_source_mute_by_index=( 'pa_op',
[POINTER(PA_CONTEXT), c_uint32, c_int, PA_CONTEXT_SUCCESS_CB_T, c_void_p] ),
pa_context_suspend_source_by_index=( 'pa_op',
[POINTER(PA_CONTEXT), c_uint32, c_int, PA_CONTEXT_SUCCESS_CB_T, c_void_p] ),
pa_context_set_source_port_by_index=( 'pa_op',
[POINTER(PA_CONTEXT), c_uint32, c_str_p, PA_CONTEXT_SUCCESS_CB_T, c_void_p] ),
pa_context_get_client_info_list=( 'pa_op',
[POINTER(PA_CONTEXT), PA_CLIENT_INFO_CB_T, c_void_p] ),
pa_context_get_client_info=( 'pa_op',
[POINTER(PA_CONTEXT), c_uint32, PA_CLIENT_INFO_CB_T, c_void_p] ),
pa_context_get_server_info=( 'pa_op',
[POINTER(PA_CONTEXT), PA_SERVER_INFO_CB_T, c_void_p] ),
pa_context_get_card_info_by_index=( 'pa_op',
[POINTER(PA_CONTEXT), c_uint32, PA_CARD_INFO_CB_T, c_void_p] ),
pa_context_get_card_info_by_name=( 'pa_op',
[POINTER(PA_CONTEXT), c_str_p, PA_CARD_INFO_CB_T, c_void_p] ),
pa_context_get_card_info_list=( 'pa_op',
[POINTER(PA_CONTEXT), PA_CARD_INFO_CB_T, c_void_p] ),
pa_context_set_card_profile_by_index=( 'pa_op',
[POINTER(PA_CONTEXT), c_uint32, c_str_p, PA_CONTEXT_SUCCESS_CB_T, c_void_p] ),
pa_context_get_module_info=( 'pa_op',
[POINTER(PA_CONTEXT), c_uint32, PA_MODULE_INFO_CB_T, c_void_p] ),
pa_context_get_module_info_list=( 'pa_op',
[POINTER(PA_CONTEXT), PA_MODULE_INFO_CB_T, c_void_p] ),
pa_context_load_module=( 'pa_op',
[POINTER(PA_CONTEXT), c_char_p, c_char_p, PA_CONTEXT_INDEX_CB_T, c_void_p] ),
pa_context_unload_module=( 'pa_op',
[POINTER(PA_CONTEXT), c_uint32, PA_CONTEXT_SUCCESS_CB_T, c_void_p] ),
pa_context_subscribe=( 'pa_op',
[POINTER(PA_CONTEXT), c_int, PA_CONTEXT_SUCCESS_CB_T, c_void_p] ),
pa_context_set_subscribe_callback=[POINTER(PA_CONTEXT), PA_SUBSCRIBE_CB_T, c_void_p],
pa_context_play_sample=( 'pa_op',
[POINTER(PA_CONTEXT), c_str_p, c_str_p, c_uint32, PA_CONTEXT_SUCCESS_CB_T, c_void_p] ),
pa_context_play_sample_with_proplist=( 'pa_op',
[ POINTER(PA_CONTEXT), c_str_p, c_str_p, c_uint32,
POINTER(PA_PROPLIST), PA_CONTEXT_SUCCESS_CB_T, c_void_p ] ),
pa_ext_stream_restore_test=( 'pa_op',
[POINTER(PA_CONTEXT), PA_EXT_STREAM_RESTORE_TEST_CB_T, c_void_p] ),
pa_ext_stream_restore_read=( 'pa_op',
[POINTER(PA_CONTEXT), PA_EXT_STREAM_RESTORE_READ_CB_T, c_void_p] ),
pa_ext_stream_restore_write=( 'pa_op', [
POINTER(PA_CONTEXT), c_int, POINTER(PA_EXT_STREAM_RESTORE_INFO),
c_uint, c_int, PA_CONTEXT_SUCCESS_CB_T, c_void_p ] ),
pa_ext_stream_restore_delete=( 'pa_op',
[POINTER(PA_CONTEXT), POINTER(c_char_p), PA_CONTEXT_SUCCESS_CB_T, c_void_p] ),
pa_proplist_from_string=([c_str_p], POINTER(PA_PROPLIST)),
pa_proplist_iterate=([POINTER(PA_PROPLIST), POINTER(c_void_p)], c_str_p),
pa_proplist_gets=([POINTER(PA_PROPLIST), c_str_p], c_str_p),
pa_proplist_free=[POINTER(PA_PROPLIST)],
pa_channel_map_init_mono=(
[POINTER(PA_CHANNEL_MAP)], (POINTER(PA_CHANNEL_MAP), 'not_null') ),
pa_channel_map_init_stereo=(
[POINTER(PA_CHANNEL_MAP)], (POINTER(PA_CHANNEL_MAP), 'not_null') ),
pa_channel_map_snprint=([c_str_p, c_int, POINTER(PA_CHANNEL_MAP)], c_str_p),
pa_channel_map_parse=(
[POINTER(PA_CHANNEL_MAP), c_str_p], (POINTER(PA_CHANNEL_MAP), 'not_null') ),
pa_channel_position_to_string=([c_int], c_str_p),
pa_stream_new_with_proplist=(
[ POINTER(PA_CONTEXT), c_str_p,
POINTER(PA_SAMPLE_SPEC), POINTER(PA_CHANNEL_MAP), POINTER(PA_PROPLIST) ],
POINTER(PA_STREAM) ),
pa_stream_set_monitor_stream=([POINTER(PA_STREAM), c_uint32], 'int_check_ge0'),
pa_stream_set_read_callback=[POINTER(PA_STREAM), PA_STREAM_REQUEST_CB_T, c_void_p],
pa_stream_connect_record=(
[POINTER(PA_STREAM), c_str_p, POINTER(PA_BUFFER_ATTR), c_int], 'int_check_ge0' ),
pa_stream_unref=[POINTER(PA_STREAM)],
pa_stream_peek=(
[POINTER(PA_STREAM), POINTER(c_void_p), POINTER(c_int)], 'int_check_ge0' ),
pa_stream_drop=([POINTER(PA_STREAM)], 'int_check_ge0'),
pa_stream_disconnect=([POINTER(PA_STREAM)], 'int_check_ge0') )
class CallError(Exception): pass
def __init__(self):
p = CDLL(ctypes.util.find_library('libpulse') or 'libpulse.so.0')
self.funcs = dict()
for k, spec in self.func_defs.items():
func, args, res_proc = getattr(p, k), None, None
if spec:
if not isinstance(spec, tuple): spec = (spec,)
for v in spec:
assert v, [k, spec, v]
if isinstance(v, list): args = v
else: res_proc = v
func_k = k if not k.startswith('pa_') else k[3:]
self.funcs[func_k] = self._func_wrapper(k, func, args, res_proc)
def _func_wrapper(self, func_name, func, args=list(), res_proc=None):
func.restype, func.argtypes = None, args
if isinstance(res_proc, tuple): func.restype, res_proc = res_proc
if isinstance(res_proc, str):
if res_proc.startswith('int_check_'): func.restype = c_int
elif res_proc == 'pa_op': func.restype = POINTER(PA_OPERATION)
elif not func.restype and hasattr(res_proc, 'c_type'): func.restype = res_proc.c_type
elif not func.restype: func.restype, res_proc = res_proc, None
def _wrapper(*args):
# print('libpulse call:', func_name, args, file=sys.stderr)
# sys.stderr.flush()
res = func(*args)
if isinstance(res_proc, str):
assert res_proc in ['int_check_ge0', 'pa_op', 'not_null']
if (res_proc == 'int_check_ge0' and res < 0)\
or (res_proc == 'pa_op' and not res)\
or (res_proc == 'not_null' and not res):
err = [func_name, args, res]
if args and isinstance(getattr(args[0], 'contents', None), PA_CONTEXT):
errno_ = self.context_errno(args[0])
err.append('{} [pulse errno {}]'.format(self.strerror(errno_), errno_))
else: err.append('Return value check failed: {}'.format(res_proc))
raise self.CallError(*err)
elif res_proc: res = res_proc(res)
return res
_wrapper.__name__ = 'libpulse.{}'.format(func_name)
return _wrapper
def __getattr__(self, k): return self.funcs[k]
def return_value(self): return pointer(c_int())
pa = LibPulse()

View File

@ -0,0 +1,93 @@
# -*- coding: utf-8 -*-
from __future__ import print_function, unicode_literals
import itertools as it, operator as op, functools as ft
import re
lookup_types = {
'sink': 'sink_list', 'source': 'source_list',
'sink-input': 'sink_input_list', 'source-output': 'source_output_list' }
lookup_types.update(it.chain.from_iterable(
((v, lookup_types[k]) for v in v) for k,v in
{ 'source': ['src'], 'sink-input': ['si', 'playback', 'play'],
'source-output': ['so', 'record', 'rec', 'mic'] }.items() ))
lookup_key_defaults = dict(
# No default keys for type = no implicit matches for that type
sink_input_list=[ # match sink_input_list objects with these keys by default
'media.name', 'media.icon_name', 'media.role',
'application.name', 'application.process.binary', 'application.icon_name' ] )
def pulse_obj_lookup(pulse, obj_lookup, prop_default=None):
'''Return set of pulse object(s) with proplist values matching lookup-string.
Pattern syntax:
[ { 'sink' | 'source' | 'sink-input' | 'source-output' } [ / ... ] ':' ]
[ proplist-key-name (non-empty) [ / ... ] ':' ] [ ':' (for regexp match) ]
[ proplist-key-value ]
Examples:
- sink:alsa.driver_name:snd_hda_intel
Match sink(s) with alsa.driver_name=snd_hda_intel (exact match!).
- sink/source:device.bus:pci
Match all sinks and sources with device.bus=pci.
- myprop:somevalue
Match any object (of all 4 supported types) that has myprop=somevalue.
- mpv
Match any object with any of the "default lookup props" (!!!) being equal to "mpv".
"default lookup props" are specified per-type in lookup_key_defaults above.
For example, sink input will be looked-up by media.name, application.name, etc.
- sink-input/source-output:mpv
Same as above, but lookup streams only (not sinks/sources).
Note that "sink-input/source-output" matches type spec, and parsed as such, not as key.
- si/so:mpv
Same as above - see aliases for types in lookup_types.
- application.binary/application.icon:mpv
Lookup by multiple keys with "any match" logic, same as with multiple object types.
- key\/with\/slashes\:and\:colons:somevalue
Lookup by key that has slashes and colons in it.
"/" and ":" must only be escaped in the proplist key part, used as-is in values.
Backslash itself can be escaped as well, i.e. as "\\".
- module-stream-restore.id:sink-input-by-media-role:music
Value has ":" in it, but there's no need to escape it in any way.
- device.description::Analog
Value lookup starting with : is interpreted as a regexp,
i.e. any object with device.description *containing* "Analog" in this case.
- si/so:application.name::^mpv\b
Return all sink-inputs/source-outputs ("si/so") where
"application.name" proplist value matches regexp "^mpv\b".
- :^mpv\b
Regexp lookup (stuff starting with "mpv" word) without type or key specification.
For python2, lookup string should be unicode type.
"prop_default" keyword arg can be used to specify
default proplist value for when key is not found there.'''
# \ue000-\uf8ff - private use area, never assigned to symbols
obj_lookup = obj_lookup.replace('\\\\', '\ue000').replace('\\:', '\ue001')
obj_types_re = '({0})(/({0}))*'.format('|'.join(lookup_types))
m = re.search(
( r'^((?P<t>{}):)?'.format(obj_types_re) +
r'((?P<k>.+?):)?' r'(?P<v>.*)$' ), obj_lookup, re.IGNORECASE )
if not m: raise ValueError(obj_lookup)
lookup_type, lookup_keys, lookup_re = op.itemgetter('t', 'k', 'v')(m.groupdict())
if lookup_keys:
lookup_keys = list(
v.replace('\ue000', '\\\\').replace('\ue001', ':').replace('\ue002', '/')
for v in lookup_keys.replace('\\/', '\ue002').split('/') )
lookup_re = lookup_re.replace('\ue000', '\\\\').replace('\ue001', '\\:')
obj_list_res, lookup_re = list(), re.compile( lookup_re[1:]
if lookup_re.startswith(':') else '^{}$'.format(re.escape(lookup_re)) )
for k in set( lookup_types[k] for k in
(lookup_type.split('/') if lookup_type else lookup_types.keys()) ):
if not lookup_keys: lookup_keys = lookup_key_defaults.get(k)
if not lookup_keys: continue
obj_list = getattr(pulse, k)()
if not obj_list: continue
for obj, k in it.product(obj_list, lookup_keys):
v = obj.proplist.get(k, prop_default)
if v is None: continue
if lookup_re.search(v): obj_list_res.append(obj)
return set(obj_list_res)

View File

@ -0,0 +1,992 @@
# -*- coding: utf-8 -*-
from __future__ import print_function
import itertools as it, operator as op, functools as ft
from collections import defaultdict
from contextlib import contextmanager
import os, sys, inspect, traceback
from . import _pulsectl as c
if sys.version_info.major >= 3:
long, unicode = int, str
print_err = ft.partial(print, file=sys.stderr, flush=True)
def wrapper_with_sig_info(func, wrapper, index_arg=False):
sig = inspect.signature(func or (lambda: None))
if index_arg:
sig = sig.replace(parameters=[inspect.Parameter( 'index',
inspect.Parameter.POSITIONAL_OR_KEYWORD )] + list(sig.parameters.values()))
wrapper.__name__, wrapper.__signature__, wrapper.__doc__ = '', sig, func.__doc__
return wrapper
else:
range, map = xrange, it.imap
def print_err(*args, **kws):
kws.setdefault('file', sys.stderr)
print(*args, **kws)
kws['file'].flush()
def wrapper_with_sig_info(func, wrapper, index_arg=False):
func_args = list(inspect.getargspec(func or (lambda: None)))
func_args[0] = list(func_args[0])
if index_arg: func_args[0] = ['index'] + func_args[0]
wrapper.__name__ = '...'
wrapper.__doc__ = 'Signature: func' + inspect.formatargspec(*func_args)
if func.__doc__: wrapper.__doc__ += '\n\n' + func.__doc__
return wrapper
is_str = lambda v,ext=None,native=False: (
isinstance(v, ( (unicode, bytes)
if not native else (str,) ) + ((ext,) if ext else ())) )
is_str_native = ft.partial(is_str, native=True)
is_num = lambda v: isinstance(v, (int, float, long))
is_list = lambda v: isinstance(v, (tuple, list))
is_dict = lambda v: isinstance(v, dict)
def assert_pulse_object(obj):
if not isinstance(obj, PulseObject):
raise TypeError( 'Pulse<something>Info'
' object is required instead of value: [{}] {}', type(obj), obj )
class FakeLock():
def __enter__(self): return self
def __exit__(self, *err): pass
@ft.total_ordering
class EnumValue(object):
'String-based enum value, comparable to native strings.'
__slots__ = '_t', '_value', '_c_val'
def __init__(self, t, value, c_value=None):
self._t, self._value, self._c_val = t, value, c_value
def __repr__(self): return '<EnumValue {}={}>'.format(self._t, self._value)
def __eq__(self, val):
if isinstance(val, EnumValue): val = val._value
return self._value == val
def __ne__(self, val): return not (self == val)
def __lt__(self, val):
if isinstance(val, EnumValue): val = val._value
return self._value < val
def __hash__(self): return hash(self._value)
class Enum(object):
def __init__(self, name, values_or_map):
vals = values_or_map
if is_str_native(vals): vals = vals.split()
if is_list(vals): vals = zip(it.repeat(None), vals)
if is_dict(vals): vals = vals.items()
self._name, self._values, self._c_vals = name, dict(), dict()
for c_val, k in vals:
v = EnumValue(name, k, c_val)
setattr(self, k.replace('-', '_'), v)
self._c_vals[c_val] = self._values[k] = v
def __getitem__(self, k, *default):
if isinstance(k, EnumValue):
t, k, v = k._t, k._value, k
if t != self._name: raise KeyError(v)
try: return getattr(self, k.replace('-', '_'), *default)
except AttributeError: raise KeyError(k)
def _get(self, k, default=None): return self.__getitem__(k, default)
def __contains__(self, k): return self._get(k) is not None
def _c_val(self, c_val, default=KeyError):
v = self._c_vals.get(c_val)
if v is not None: return v
if default is not KeyError:
return EnumValue(self._name, default, c_val)
raise KeyError(c_val)
def __repr__(self):
return '<Enum {} [{}]>'.format(self._name, ' '.join(sorted(self._values.keys())))
PulseEventTypeEnum = Enum('event-type', c.PA_EVENT_TYPE_MAP)
PulseEventFacilityEnum = Enum('event-facility', c.PA_EVENT_FACILITY_MAP)
PulseEventMaskEnum = Enum('event-mask', c.PA_EVENT_MASK_MAP)
PulseStateEnum = Enum('sink/source-state', c.PA_OBJ_STATE_MAP)
PulseUpdateEnum = Enum('update-type', c.PA_UPDATE_MAP)
PulsePortAvailableEnum = Enum('available', c.PA_PORT_AVAILABLE_MAP)
PulseDirectionEnum = Enum('direction', c.PA_DIRECTION_MAP)
class PulseError(Exception): pass
class PulseOperationFailed(PulseError): pass
class PulseOperationInvalid(PulseOperationFailed): pass
class PulseIndexError(PulseError): pass
class PulseLoopStop(Exception): pass
class PulseDisconnected(Exception): pass
class PulseObject(object):
c_struct_wrappers = dict()
def __init__(self, struct=None, *field_data_list, **field_data_dict):
field_data, fields = dict(), getattr(self, 'c_struct_fields', list())
if is_str_native(fields): fields = self.c_struct_fields = fields.split()
if field_data_list: field_data.update(zip(fields, field_data_list))
if field_data_dict: field_data.update(field_data_dict)
if struct is None: field_data, struct = dict(), field_data
assert not set(field_data.keys()).difference(fields)
if field_data: self._copy_struct_fields(field_data, fields=field_data.keys())
self._copy_struct_fields(struct, fields=set(fields).difference(field_data.keys()))
if struct:
if hasattr(struct, 'proplist'):
self.proplist, state = dict(), c.c_void_p()
while True:
k = c.pa.proplist_iterate(struct.proplist, c.byref(state))
if not k: break
self.proplist[c.force_str(k)] = c.force_str(c.pa.proplist_gets(struct.proplist, k))
if hasattr(struct, 'volume'):
self.volume = self._get_wrapper(PulseVolumeInfo)(struct.volume)
if hasattr(struct, 'base_volume'):
self.base_volume = struct.base_volume / c.PA_VOLUME_NORM
if hasattr(struct, 'n_ports'):
cls_port = self._get_wrapper(PulsePortInfo)
self.port_list = list(
cls_port(struct.ports[n].contents) for n in range(struct.n_ports) )
if hasattr(struct, 'active_port'):
cls_port = self._get_wrapper(PulsePortInfo)
self.port_active = (
None if not struct.active_port else cls_port(struct.active_port.contents) )
if hasattr(struct, 'channel_map'):
self.channel_count, self.channel_list = struct.channel_map.channels, list()
self.channel_list_raw = struct.channel_map.map[:self.channel_count]
if self.channel_count > 0:
s = c.create_string_buffer(b'\0' * 512)
c.pa.channel_map_snprint(s, len(s), struct.channel_map)
self.channel_list.extend(map(c.force_str, s.value.strip().split(b',')))
if hasattr(struct, 'state'):
self.state = PulseStateEnum._c_val(
struct.state, u'state.{}'.format(struct.state) )
self.state_values = sorted(PulseStateEnum._values.values())
if hasattr(struct, 'corked'): self.corked = bool(struct.corked)
self._init_from_struct(struct)
def _get_wrapper(self, cls_base):
return self.c_struct_wrappers.get(cls_base, cls_base)
def _copy_struct_fields(self, struct, fields=None, str_errors='strict'):
if not fields: fields = self.c_struct_fields
for k in fields:
setattr(self, k, c.force_str( getattr(struct, k)
if not is_dict(struct) else struct[k], str_errors ))
def _init_from_struct(self, struct): pass # to parse fields in subclasses
def _as_str(self, ext=None, fields=None, **kws):
kws = list(it.starmap('{}={}'.format, kws.items()))
if fields:
if is_str_native(fields): fields = fields.split()
kws.extend('{}={!r}'.format(k, getattr(self, k)) for k in fields)
kws = sorted(kws)
if ext: kws.append(str(ext))
return ', '.join(kws)
def __str__(self):
return self._as_str(fields=self.c_struct_fields)
def __repr__(self):
return '<{} at {:x} - {}>'.format(self.__class__.__name__, id(self), str(self))
class PulsePortInfo(PulseObject):
c_struct_fields = 'name description available priority'
def _init_from_struct(self, struct):
self.available = PulsePortAvailableEnum._c_val(struct.available)
self.available_state = self.available # for compatibility with <=17.6.0
def __eq__(self, o):
if not isinstance(o, PulsePortInfo): raise TypeError(o)
return self.name == o.name
def __hash__(self): return hash(self.name)
class PulseClientInfo(PulseObject):
c_struct_fields = 'name index driver owner_module'
class PulseServerInfo(PulseObject):
c_struct_fields = ( 'user_name host_name'
' server_version server_name default_sink_name default_source_name cookie' )
class PulseModuleInfo(PulseObject):
c_struct_fields = 'index name argument n_used auto_unload'
class PulseSinkInfo(PulseObject):
c_struct_fields = ( 'index name mute'
' description sample_spec owner_module latency driver'
' monitor_source monitor_source_name flags configured_latency card' )
def __str__(self):
return self._as_str(self.volume, fields='index name description mute')
class PulseSinkInputInfo(PulseObject):
c_struct_fields = ( 'index name mute corked client'
' owner_module sink sample_spec'
' buffer_usec sink_usec resample_method driver' )
def __str__(self):
return self._as_str(fields='index name mute')
class PulseSourceInfo(PulseObject):
c_struct_fields = ( 'index name mute'
' description sample_spec owner_module latency driver monitor_of_sink'
' monitor_of_sink_name flags configured_latency card' )
def __str__(self):
return self._as_str(self.volume, fields='index name description mute')
class PulseSourceOutputInfo(PulseObject):
c_struct_fields = ( 'index name mute corked client'
' owner_module source sample_spec'
' buffer_usec source_usec resample_method driver' )
def __str__(self):
return self._as_str(fields='index name mute')
class PulseCardProfileInfo(PulseObject):
c_struct_fields = 'name description n_sinks n_sources priority available'
class PulseCardPortInfo(PulsePortInfo):
c_struct_fields = 'name description available priority direction latency_offset'
def _init_from_struct(self, struct):
super(PulseCardPortInfo, self)._init_from_struct(struct)
self.direction = PulseDirectionEnum._c_val(struct.direction)
class PulseCardInfo(PulseObject):
c_struct_fields = 'name index driver owner_module n_profiles'
c_struct_wrappers = {PulsePortInfo: PulseCardPortInfo}
def __init__(self, struct):
super(PulseCardInfo, self).__init__(struct)
self.profile_list = list(
PulseCardProfileInfo(struct.profiles2[n][0]) for n in range(self.n_profiles) )
self.profile_active = PulseCardProfileInfo(struct.active_profile2.contents)
def __str__(self):
return self._as_str(
fields='name index driver n_profiles',
profile_active='[{}]'.format(self.profile_active.name) )
class PulseVolumeInfo(PulseObject):
def __init__(self, struct_or_values=None, channels=None):
if is_num(struct_or_values):
assert channels is not None, 'Channel count specified if volume value is not a list.'
self.values = [struct_or_values] * channels
elif is_list(struct_or_values): self.values = struct_or_values
else:
self.values = list( (x / c.PA_VOLUME_NORM)
for x in map(float, struct_or_values.values[:struct_or_values.channels]) )
@property
def value_flat(self): return (sum(self.values) / float(len(self.values))) if self.values else 0
@value_flat.setter
def value_flat(self, v): self.values = [v] * len(self.values)
def to_struct(self):
return c.PA_CVOLUME(
len(self.values), tuple(min( c.PA_VOLUME_UI_MAX,
int(round(v * c.PA_VOLUME_NORM)) ) for v in self.values) )
def __str__(self):
return self._as_str(
channels=len(self.values), volumes='[{}]'.format(
' '.join('{}%'.format(int(round(v*100))) for v in self.values) ) )
class PulseExtStreamRestoreInfo(PulseObject):
c_struct_fields = 'name channel_map volume mute device'
@classmethod
def struct_from_value( cls, name, volume,
channel_list=None, mute=False, device=None ):
'Same arguments as with class instance init.'
chan_map = c.PA_CHANNEL_MAP()
if not channel_list: c.pa.channel_map_init_mono(chan_map)
else:
if not is_str(channel_list):
channel_list = b','.join(map(c.force_bytes, channel_list))
c.pa.channel_map_parse(chan_map, channel_list)
if not isinstance(volume, PulseVolumeInfo):
volume = PulseVolumeInfo(volume, chan_map.channels)
struct = c.PA_EXT_STREAM_RESTORE_INFO(
name=c.force_bytes(name),
mute=int(bool(mute)), device=c.force_bytes(device),
channel_map=chan_map, volume=volume.to_struct() )
return struct
def __init__( self, struct_or_name=None,
volume=None, channel_list=None, mute=False, device=None ):
'''If string name is passed instead of C struct, will be initialized from args/kws.
"volume" can be either a float number
(same level for all channels) or list (value per channel).
"channel_list" can be a pulse channel map string (comma-separated) or list
of channel names. Defaults to stereo map, should probably match volume channels.
"device" - name of sink/source or None (default).'''
if is_str(struct_or_name):
struct_or_name = self.struct_from_value(
struct_or_name, volume, channel_list, mute, device )
super(PulseExtStreamRestoreInfo, self).__init__(struct_or_name)
def to_struct(self):
return self.struct_from_value(**dict(
(k, getattr(self, k)) for k in 'name volume channel_list mute device'.split() ))
def __str__(self):
return self._as_str(self.volume, fields='name mute device')
class PulseEventInfo(PulseObject):
def __init__(self, ev_t, facility, index):
self.t, self.facility, self.index = ev_t, facility, index
def __str__(self):
return self._as_str(fields='t facility index'.split())
class Pulse(object):
_ctx = None
def __init__(self, client_name=None, server=None, connect=True, threading_lock=False):
'''Connects to specified pulse server by default.
Specifying "connect=False" here prevents that, but be sure to call connect() later.
"connect=False" can also be used here to
have control over options passed to connect() method.
"threading_lock" option (either bool or lock instance) can be used to wrap
non-threadsafe eventloop polling (can only be done from one thread at a time)
into a mutex lock, and should only be needed if same-instance methods
will/should/might be called from different threads at the same time.'''
self.name = client_name or 'pulsectl'
self.server, self.connected = server, None
self._ret = self._ctx = self._loop = self._api = None
self._actions, self._action_ids = dict(),\
it.chain.from_iterable(map(range, it.repeat(2**30)))
self.init()
if threading_lock:
if threading_lock is True:
import threading
threading_lock = threading.Lock()
self._loop_lock = threading_lock
if connect:
try: self.connect(autospawn=True)
except PulseError:
self.close()
raise
def init(self):
self._pa_state_cb = c.PA_STATE_CB_T(self._pulse_state_cb)
self._pa_subscribe_cb = c.PA_SUBSCRIBE_CB_T(self._pulse_subscribe_cb)
self._loop, self._loop_lock = c.pa.mainloop_new(), FakeLock()
self._loop_running = self._loop_closed = False
self._api = c.pa.mainloop_get_api(self._loop)
self._ret = c.pa.return_value()
self._ctx_init()
self.event_types = sorted(PulseEventTypeEnum._values.values())
self.event_facilities = sorted(PulseEventFacilityEnum._values.values())
self.event_masks = sorted(PulseEventMaskEnum._values.values())
self.event_callback = None
chan_names = dict()
for n in range(256):
name = c.pa.channel_position_to_string(n)
if name is None: break
chan_names[n] = name
self.channel_list_enum = Enum('channel_pos', chan_names)
def _ctx_init(self):
if self._ctx:
with self._loop_lock:
self.disconnect()
c.pa.context_unref(self._ctx)
self._ctx = c.pa.context_new(self._api, self.name)
c.pa.context_set_state_callback(self._ctx, self._pa_state_cb, None)
c.pa.context_set_subscribe_callback(self._ctx, self._pa_subscribe_cb, None)
def connect(self, autospawn=False, wait=False, timeout=None):
'''Connect to pulseaudio server.
"autospawn" option will start new pulse daemon, if necessary.
Specifying "wait" option will make function block until pulseaudio server appears.
"timeout" (in seconds) will raise PulseError if connection not established within it.'''
if self._loop_closed:
raise PulseError('Eventloop object was already'
' destroyed and cannot be reused from this instance.')
if self.connected is not None: self._ctx_init()
flags, self.connected = 0, None
if not autospawn: flags |= c.PA_CONTEXT_NOAUTOSPAWN
if wait: flags |= c.PA_CONTEXT_NOFAIL
try: c.pa.context_connect(self._ctx, self.server, flags, None)
except c.pa.CallError: self.connected = False
if not timeout: # simplier process
while self.connected is None: self._pulse_iterate()
else:
self._loop_stop, delta, ts_deadline = True, 1, c.mono_time() + timeout
while self.connected is None:
delta = ts_deadline - c.mono_time()
self._pulse_poll(delta)
if delta <= 0: break
self._loop_stop = False
if not self.connected:
c.pa.context_disconnect(self._ctx)
while self.connected is not False: self._pulse_iterate()
raise PulseError('Timed-out connecting to pulseaudio server [{:,.1f}s]'.format(timeout))
if self.connected is False: raise PulseError('Failed to connect to pulseaudio server')
def disconnect(self):
if not self._ctx or not self.connected: return
c.pa.context_disconnect(self._ctx)
def close(self):
if not self._loop: return
if self._loop_running: # called from another thread
self._loop_closed = True
c.pa.mainloop_quit(self._loop, 0)
return # presumably will be closed in a thread that's running it
with self._loop_lock:
try:
self.disconnect()
c.pa.context_unref(self._ctx)
c.pa.mainloop_free(self._loop)
finally: self._ctx = self._loop = None
def __enter__(self): return self
def __exit__(self, err_t, err, err_tb): self.close()
def _pulse_state_cb(self, ctx, userdata):
state = c.pa.context_get_state(ctx)
if state >= c.PA_CONTEXT_READY:
if state == c.PA_CONTEXT_READY: self.connected = True
elif state in [c.PA_CONTEXT_FAILED, c.PA_CONTEXT_TERMINATED]:
self.connected, self._loop_stop = False, True
def _pulse_subscribe_cb(self, ctx, ev, idx, userdata):
if not self.event_callback: return
n = ev & c.PA_SUBSCRIPTION_EVENT_FACILITY_MASK
ev_fac = PulseEventFacilityEnum._c_val(n, 'ev.facility.{}'.format(n))
n = ev & c.PA_SUBSCRIPTION_EVENT_TYPE_MASK
ev_t = PulseEventTypeEnum._c_val(n, 'ev.type.{}'.format(n))
try: self.event_callback(PulseEventInfo(ev_t, ev_fac, idx))
except PulseLoopStop: self._loop_stop = True
def _pulse_poll_cb(self, func, func_err, ufds, nfds, timeout, userdata):
fd_list = list(ufds[n] for n in range(nfds))
try: nfds = func(fd_list, timeout / 1000.0)
except Exception as err:
func_err(*sys.exc_info())
return -1
return nfds
@contextmanager
def _pulse_loop(self):
with self._loop_lock:
if not self._loop: return
if self._loop_running:
raise PulseError(
'Running blocking pulse operations from pulse eventloop callbacks'
' or other threads while loop is running is not supported by this python module.'
' Supporting this would require threads or proper asyncio/twisted-like async code.'
' Workaround can be to stop the loop'
' (raise PulseLoopStop in callback or event_loop_stop() from another thread),'
' doing whatever pulse calls synchronously and then resuming event_listen() loop.' )
self._loop_running, self._loop_stop = True, False
try: yield self._loop
finally:
self._loop_running = False
if self._loop_closed: self.close() # to free() after stopping it
def _pulse_run(self):
with self._pulse_loop() as loop: c.pa.mainloop_run(loop, self._ret)
def _pulse_iterate(self, block=True):
with self._pulse_loop() as loop: c.pa.mainloop_iterate(loop, int(block), self._ret)
@contextmanager
def _pulse_op_cb(self, raw=False):
act_id = next(self._action_ids)
self._actions[act_id] = None
try:
cb = lambda s=True,k=act_id: self._actions.update({k: bool(s)})
if not raw: cb = c.PA_CONTEXT_SUCCESS_CB_T(lambda ctx,s,d,cb=cb: cb(s))
yield cb
while self.connected and self._actions[act_id] is None: self._pulse_iterate()
if not self._actions[act_id]: raise PulseOperationFailed(act_id)
finally: self._actions.pop(act_id, None)
def _pulse_poll(self, timeout=None):
'''timeout should be in seconds (float),
0 for non-blocking poll and None (default) for no timeout.'''
with self._pulse_loop() as loop:
ts = c.mono_time()
ts_deadline = timeout and (ts + timeout)
while True:
delay = max(0, int((ts_deadline - ts) * 1000000)) if ts_deadline else -1
c.pa.mainloop_prepare(loop, delay) # delay in us
c.pa.mainloop_poll(loop)
if self._loop_closed: break # interrupted by close() or such
c.pa.mainloop_dispatch(loop)
if self._loop_stop: break
ts = c.mono_time()
if ts_deadline and ts >= ts_deadline: break
def _pulse_info_cb(self, info_cls, data_list, done_cb, ctx, info, eof, userdata):
# No idea where callbacks with "userdata != NULL" come from,
# but "info" pointer in them is always invalid, so they are discarded here.
# Looks like some kind of mixup or corruption in libpulse memory?
# See also: https://github.com/mk-fg/python-pulse-control/issues/35
if userdata is not None: return
# Empty result list and conn issues are checked elsewhere.
# Errors here are non-descriptive (errno), so should not be useful anyway.
# if eof < 0: done_cb(s=False)
if eof: done_cb()
else: data_list.append(info_cls(info[0]))
def _pulse_get_list(cb_t, pulse_func, info_cls, singleton=False, index_arg=True):
def _wrapper_method(self, index=None):
data = list()
with self._pulse_op_cb(raw=True) as cb:
cb = cb_t(
ft.partial(self._pulse_info_cb, info_cls, data, cb) if not singleton else
lambda ctx, info, userdata, cb=cb: data.append(info_cls(info[0])) or cb() )
pa_op = pulse_func( self._ctx,
*([index, cb, None] if index is not None else [cb, None]) )
c.pa.operation_unref(pa_op)
data = data or list()
if index is not None or singleton:
if not data: raise PulseIndexError(index)
data, = data
return data
return wrapper_with_sig_info( None, _wrapper_method,
not (pulse_func.__name__.endswith('_list') or singleton or not index_arg) )
get_sink_by_name = _pulse_get_list(
c.PA_SINK_INFO_CB_T,
c.pa.context_get_sink_info_by_name, PulseSinkInfo )
get_source_by_name = _pulse_get_list(
c.PA_SOURCE_INFO_CB_T,
c.pa.context_get_source_info_by_name, PulseSourceInfo )
get_card_by_name = _pulse_get_list(
c.PA_CARD_INFO_CB_T,
c.pa.context_get_card_info_by_name, PulseCardInfo )
sink_input_list = _pulse_get_list(
c.PA_SINK_INPUT_INFO_CB_T,
c.pa.context_get_sink_input_info_list, PulseSinkInputInfo )
sink_input_info = _pulse_get_list(
c.PA_SINK_INPUT_INFO_CB_T,
c.pa.context_get_sink_input_info, PulseSinkInputInfo )
source_output_list = _pulse_get_list(
c.PA_SOURCE_OUTPUT_INFO_CB_T,
c.pa.context_get_source_output_info_list, PulseSourceOutputInfo )
source_output_info = _pulse_get_list(
c.PA_SOURCE_OUTPUT_INFO_CB_T,
c.pa.context_get_source_output_info, PulseSourceOutputInfo )
sink_list = _pulse_get_list(
c.PA_SINK_INFO_CB_T, c.pa.context_get_sink_info_list, PulseSinkInfo )
sink_info = _pulse_get_list(
c.PA_SINK_INFO_CB_T, c.pa.context_get_sink_info_by_index, PulseSinkInfo )
source_list = _pulse_get_list(
c.PA_SOURCE_INFO_CB_T, c.pa.context_get_source_info_list, PulseSourceInfo )
source_info = _pulse_get_list(
c.PA_SOURCE_INFO_CB_T, c.pa.context_get_source_info_by_index, PulseSourceInfo )
card_list = _pulse_get_list(
c.PA_CARD_INFO_CB_T, c.pa.context_get_card_info_list, PulseCardInfo )
card_info = _pulse_get_list(
c.PA_CARD_INFO_CB_T, c.pa.context_get_card_info_by_index, PulseCardInfo )
client_list = _pulse_get_list(
c.PA_CLIENT_INFO_CB_T, c.pa.context_get_client_info_list, PulseClientInfo )
client_info = _pulse_get_list(
c.PA_CLIENT_INFO_CB_T, c.pa.context_get_client_info, PulseClientInfo )
server_info = _pulse_get_list(
c.PA_SERVER_INFO_CB_T, c.pa.context_get_server_info, PulseServerInfo, singleton=True )
module_info = _pulse_get_list(
c.PA_MODULE_INFO_CB_T, c.pa.context_get_module_info, PulseModuleInfo )
module_list = _pulse_get_list(
c.PA_MODULE_INFO_CB_T, c.pa.context_get_module_info_list, PulseModuleInfo )
def _pulse_method_call(pulse_op, func=None, index_arg=True):
'''Creates following synchronous wrapper for async pa_operation callable:
wrapper(index, ...) -> pulse_op(index, [*]args_func(...))
index_arg=False: wrapper(...) -> pulse_op([*]args_func(...))'''
def _wrapper(self, *args, **kws):
if index_arg:
if 'index' in kws: index = kws.pop('index')
else: index, args = args[0], args[1:]
pulse_args = func(*args, **kws) if func else list()
if not is_list(pulse_args): pulse_args = [pulse_args]
if index_arg: pulse_args = [index] + list(pulse_args)
with self._pulse_op_cb() as cb:
try: pulse_op(self._ctx, *(list(pulse_args) + [cb, None]))
except c.ArgumentError as err: raise TypeError(err.args)
except c.pa.CallError as err: raise PulseOperationInvalid(err.args[-1])
return wrapper_with_sig_info(func, _wrapper, index_arg)
card_profile_set_by_index = _pulse_method_call(
c.pa.context_set_card_profile_by_index, lambda profile_name: profile_name )
sink_default_set = _pulse_method_call(
c.pa.context_set_default_sink, index_arg=False,
func=lambda sink: sink.name if isinstance(sink, PulseSinkInfo) else sink )
source_default_set = _pulse_method_call(
c.pa.context_set_default_source, index_arg=False,
func=lambda source: source.name if isinstance(source, PulseSourceInfo) else source )
sink_input_mute = _pulse_method_call(
c.pa.context_set_sink_input_mute, lambda mute=True: mute )
sink_input_move = _pulse_method_call(
c.pa.context_move_sink_input_by_index, lambda sink_index: sink_index )
sink_mute = _pulse_method_call(
c.pa.context_set_sink_mute_by_index, lambda mute=True: mute )
sink_input_volume_set = _pulse_method_call(
c.pa.context_set_sink_input_volume, lambda vol: vol.to_struct() )
sink_volume_set = _pulse_method_call(
c.pa.context_set_sink_volume_by_index, lambda vol: vol.to_struct() )
sink_suspend = _pulse_method_call(
c.pa.context_suspend_sink_by_index, lambda suspend=True: suspend )
sink_port_set = _pulse_method_call(
c.pa.context_set_sink_port_by_index,
lambda port: port.name if isinstance(port, PulsePortInfo) else port )
source_output_mute = _pulse_method_call(
c.pa.context_set_source_output_mute, lambda mute=True: mute )
source_output_move = _pulse_method_call(
c.pa.context_move_source_output_by_index, lambda sink_index: sink_index )
source_mute = _pulse_method_call(
c.pa.context_set_source_mute_by_index, lambda mute=True: mute )
source_output_volume_set = _pulse_method_call(
c.pa.context_set_source_output_volume, lambda vol: vol.to_struct() )
source_volume_set = _pulse_method_call(
c.pa.context_set_source_volume_by_index, lambda vol: vol.to_struct() )
source_suspend = _pulse_method_call(
c.pa.context_suspend_source_by_index, lambda suspend=True: suspend )
source_port_set = _pulse_method_call(
c.pa.context_set_source_port_by_index,
lambda port: port.name if isinstance(port, PulsePortInfo) else port )
def module_load(self, name, args=''):
if is_list(args): args = ' '.join(args)
name, args = map(c.force_bytes, [name, args])
data = list()
with self._pulse_op_cb(raw=True) as cb:
cb = c.PA_CONTEXT_INDEX_CB_T(
lambda ctx, index, userdata, cb=cb: data.append(index) or cb() )
try: c.pa.context_load_module(self._ctx, name, args, cb, None)
except c.pa.CallError as err: raise PulseOperationInvalid(err.args[-1])
index, = data
if index == c.PA_INVALID:
raise PulseError('Failed to load module: {} {}'.format(name, args))
return index
module_unload = _pulse_method_call(c.pa.context_unload_module, None)
def stream_restore_test(self):
'Returns module-stream-restore version int (e.g. 1) or None if it is unavailable.'
data = list()
with self._pulse_op_cb(raw=True) as cb:
cb = c.PA_EXT_STREAM_RESTORE_TEST_CB_T(
lambda ctx, version, userdata, cb=cb: data.append(version) or cb() )
try: c.pa.ext_stream_restore_test(self._ctx, cb, None)
except c.pa.CallError as err: raise PulseOperationInvalid(err.args[-1])
version, = data
return version if version != c.PA_INVALID else None
stream_restore_read = _pulse_get_list(
c.PA_EXT_STREAM_RESTORE_READ_CB_T,
c.pa.ext_stream_restore_read, PulseExtStreamRestoreInfo, index_arg=False )
stream_restore_list = stream_restore_read # for consistency with other *_list methods
@ft.partial(_pulse_method_call, c.pa.ext_stream_restore_write, index_arg=False)
def stream_restore_write( obj_name_or_list,
mode='merge', apply_immediately=False, **obj_kws ):
'''Update module-stream-restore db entry for specified name.
Can be passed PulseExtStreamRestoreInfo object or list of them as argument,
or name string there and object init keywords (e.g. volume, mute, channel_list, etc).
"mode" is PulseUpdateEnum value of
'merge' (default), 'replace' or 'set' (replaces ALL entries!!!).'''
mode = PulseUpdateEnum[mode]._c_val
if is_str(obj_name_or_list):
obj_name_or_list = PulseExtStreamRestoreInfo(obj_name_or_list, **obj_kws)
if isinstance(obj_name_or_list, PulseExtStreamRestoreInfo):
obj_name_or_list = [obj_name_or_list]
# obj_array is an array of structs, laid out contiguously in memory, not pointers
obj_array = (c.PA_EXT_STREAM_RESTORE_INFO * len(obj_name_or_list))()
for n, obj in enumerate(obj_name_or_list):
obj_struct, dst_struct = obj.to_struct(), obj_array[n]
for k,t in obj_struct._fields_: setattr(dst_struct, k, getattr(obj_struct, k))
return mode, obj_array, len(obj_array), int(bool(apply_immediately))
@ft.partial(_pulse_method_call, c.pa.ext_stream_restore_delete, index_arg=False)
def stream_restore_delete(obj_name_or_list):
'''Can be passed string name,
PulseExtStreamRestoreInfo object or a list of any of these.'''
if is_str(obj_name_or_list, PulseExtStreamRestoreInfo):
obj_name_or_list = [obj_name_or_list]
name_list = list((obj.name if isinstance( obj,
PulseExtStreamRestoreInfo ) else obj) for obj in obj_name_or_list)
name_struct = (c.c_char_p * len(name_list))()
name_struct[:] = list(map(c.force_bytes, name_list))
return [name_struct]
def default_set(self, obj):
'Set passed sink or source to be used as default one by pulseaudio server.'
assert_pulse_object(obj)
method = {
PulseSinkInfo: self.sink_default_set,
PulseSourceInfo: self.source_default_set }.get(type(obj))
if not method: raise NotImplementedError(type(obj))
method(obj)
def mute(self, obj, mute=True):
assert_pulse_object(obj)
method = {
PulseSinkInfo: self.sink_mute,
PulseSinkInputInfo: self.sink_input_mute,
PulseSourceInfo: self.source_mute,
PulseSourceOutputInfo: self.source_output_mute }.get(type(obj))
if not method: raise NotImplementedError(type(obj))
method(obj.index, mute)
obj.mute = mute
def port_set(self, obj, port):
assert_pulse_object(obj)
method = {
PulseSinkInfo: self.sink_port_set,
PulseSourceInfo: self.source_port_set }.get(type(obj))
if not method: raise NotImplementedError(type(obj))
method(obj.index, port)
obj.port_active = port
def card_profile_set(self, card, profile):
assert_pulse_object(card)
if is_str(profile):
profile_dict = dict((p.name, p) for p in card.profile_list)
if profile not in profile_dict:
raise PulseIndexError( 'Card does not have'
' profile with specified name: {!r}'.format(profile) )
profile = profile_dict[profile]
self.card_profile_set_by_index(card.index, profile.name)
card.profile_active = profile
def volume_set(self, obj, vol):
assert_pulse_object(obj)
method = {
PulseSinkInfo: self.sink_volume_set,
PulseSinkInputInfo: self.sink_input_volume_set,
PulseSourceInfo: self.source_volume_set,
PulseSourceOutputInfo: self.source_output_volume_set }.get(type(obj))
if not method: raise NotImplementedError(type(obj))
method(obj.index, vol)
obj.volume = vol
def volume_set_all_chans(self, obj, vol):
assert_pulse_object(obj)
obj.volume.value_flat = vol
self.volume_set(obj, obj.volume)
def volume_change_all_chans(self, obj, inc):
assert_pulse_object(obj)
obj.volume.values = [max(0, v + inc) for v in obj.volume.values]
self.volume_set(obj, obj.volume)
def volume_get_all_chans(self, obj):
# Purpose of this func can be a bit confusing, being here next to set/change ones
'''Get "flat" volume float value for info-object as a mean of all channel values.
Note that this DOES NOT query any kind of updated values from libpulse,
and simply returns value(s) stored in passed object, i.e. same ones for same object.'''
assert_pulse_object(obj)
return obj.volume.value_flat
def event_mask_set(self, *masks):
mask = 0
for m in masks: mask |= PulseEventMaskEnum[m]._c_val
with self._pulse_op_cb() as cb:
c.pa.context_subscribe(self._ctx, mask, cb, None)
def event_callback_set(self, func):
'''Call event_listen() to start receiving these,
and be sure to raise PulseLoopStop in a callback to stop the loop.
Callback should accept single argument - PulseEventInfo instance.
Passing None will disable the thing.'''
self.event_callback = func
def event_listen(self, timeout=None, raise_on_disconnect=True):
'''Does not return until PulseLoopStop
gets raised in event callback or timeout passes.
timeout should be in seconds (float),
0 for non-blocking poll and None (default) for no timeout.
raise_on_disconnect causes PulseDisconnected exceptions by default.
Do not run any pulse operations from these callbacks.'''
assert self.event_callback
try: self._pulse_poll(timeout)
except c.pa.CallError: pass # e.g. from mainloop_dispatch() on disconnect
if raise_on_disconnect and not self.connected: raise PulseDisconnected()
def event_listen_stop(self):
'''Stop event_listen() loop from e.g. another thread.
Does nothing if libpulse poll is not running yet, so might be racey with
event_listen() - be sure to call it in a loop until event_listen returns or something.'''
self._loop_stop = True
c.pa.mainloop_wakeup(self._loop)
def set_poll_func(self, func, func_err_handler=None):
'''Can be used to integrate pulse client into existing eventloop.
Function will be passed a list of pollfd structs and timeout value (seconds, float),
which it is responsible to use and modify (set poll flags) accordingly,
returning int value >= 0 with number of fds that had any new events within timeout.
func_err_handler defaults to traceback.print_exception(),
and will be called on any exceptions from callback (to e.g. log these),
returning poll error code (-1) to libpulse after that.'''
if not func_err_handler: func_err_handler = traceback.print_exception
self._pa_poll_cb = c.PA_POLL_FUNC_T(ft.partial(self._pulse_poll_cb, func, func_err_handler))
c.pa.mainloop_set_poll_func(self._loop, self._pa_poll_cb, None)
def get_peak_sample(self, source, timeout, stream_idx=None):
'''Returns peak (max) value in 0-1.0 range for samples in source/stream within timespan.
"source" can be either int index of pulseaudio source
(i.e. source.index), its name (source.name), or None to use default source.
Resulting value is what pulseaudio returns as
PA_SAMPLE_FLOAT32NE float after "timeout" seconds.
If specified source does not exist, 0 should be returned after timeout.
This can be used to detect if there's any sound
on the microphone or any sound played through a sink via its monitor_source index,
or same for any specific stream connected to these (if "stream_idx" is passed).
Sample stream masquerades as
application.id=org.PulseAudio.pavucontrol to avoid being listed in various mixer apps.
Example - get peak for specific sink input "si" for 0.8 seconds:
pulse.get_peak_sample(pulse.sink_info(si.sink).monitor_source, 0.8, si.index)'''
samples, proplist = [0], c.pa.proplist_from_string('application.id=org.PulseAudio.pavucontrol')
ss = c.PA_SAMPLE_SPEC(format=c.PA_SAMPLE_FLOAT32NE, rate=25, channels=1)
s = c.pa.stream_new_with_proplist(self._ctx, 'peak detect', c.byref(ss), None, proplist)
c.pa.proplist_free(proplist)
@c.PA_STREAM_REQUEST_CB_T
def read_cb(s, bs, userdata):
buff, bs = c.c_void_p(), c.c_int(bs)
c.pa.stream_peek(s, buff, c.byref(bs))
try:
if not buff or bs.value < 4: return
# This assumes that native byte order for floats is BE, same as pavucontrol
samples[0] = max(samples[0], c.cast(buff, c.POINTER(c.c_float))[0])
finally:
# stream_drop() flushes buffered data (incl. buff=NULL "hole" data)
# stream.h: "should not be called if the buffer is empty"
if bs.value: c.pa.stream_drop(s)
if stream_idx is not None: c.pa.stream_set_monitor_stream(s, stream_idx)
c.pa.stream_set_read_callback(s, read_cb, None)
if source is not None: source = unicode(source).encode('utf-8')
try:
c.pa.stream_connect_record( s, source,
c.PA_BUFFER_ATTR(fragsize=4, maxlength=2**32-1),
c.PA_STREAM_DONT_MOVE | c.PA_STREAM_PEAK_DETECT |
c.PA_STREAM_ADJUST_LATENCY | c.PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND )
except c.pa.CallError:
c.pa.stream_unref(s)
raise
try: self._pulse_poll(timeout)
finally:
try: c.pa.stream_disconnect(s)
except c.pa.CallError: pass # stream was removed
c.pa.stream_unref(s)
return min(1.0, samples[0])
def play_sample(self, name, sink=None, volume=1.0, proplist_str=None):
'''Play specified sound sample,
with an optional sink object/name/index, volume and proplist string parameters.
Sample must be stored on the server in advance, see e.g. "pacmd list-samples".
See also libcanberra for an easy XDG theme sample loading, storage and playback API.'''
if isinstance(sink, PulseSinkInfo): sink = sink.index
sink = str(sink) if sink is not None else None
proplist = c.pa.proplist_from_string(proplist_str) if proplist_str else None
volume = int(round(volume*c.PA_VOLUME_NORM))
with self._pulse_op_cb() as cb:
try:
if not proplist:
c.pa.context_play_sample(self._ctx, name, sink, volume, cb, None)
else:
c.pa.context_play_sample_with_proplist(
self._ctx, name, sink, volume, proplist, cb, None )
except c.pa.CallError as err: raise PulseOperationInvalid(err.args[-1])
def connect_to_cli(server=None, as_file=True, socket_timeout=1.0, attempts=5, retry_delay=0.3):
'''Returns connected CLI interface socket (as file object, unless as_file=False),
where one can send same commands (as lines) as to "pacmd" tool
or pulseaudio startup files (e.g. "default.pa").
"server" option can be specified to use non-standard unix socket path
(when passed absolute path string) or remote tcp socket,
when passed remote host address (to use default port) or (host, port) tuple.
Be sure to adjust "socket_timeout" option for tcp sockets over laggy internet.
Returned file object has line-buffered output,
so there should be no need to use flush() after every command.
Be sure to read from the socket line-by-line until
"### EOF" or timeout for commands that have output (e.g. "dump\\n").
If default server socket is used (i.e. not specified),
server pid will be signaled to load module-cli between connection attempts.
Completely separate protocol from the regular API, as wrapped by libpulse.
PulseError is raised on any failure.'''
import socket, errno, signal, time
s, n = None, attempts if attempts > 0 else None
try:
pid_path, sock_af, sock_t = None, socket.AF_UNIX, socket.SOCK_STREAM
if not server: server, pid_path = map(c.pa.runtime_path, ['cli', 'pid'])
else:
if not is_list(server):
server = c.force_str(server)
if not server.startswith('/'): server = server, 4712 # default port
if is_list(server):
try:
addrinfo = socket.getaddrinfo(
server[0], server[1], 0, sock_t, socket.IPPROTO_TCP )
if not addrinfo: raise socket.gaierror('No addrinfo for socket: {}'.format(server))
except (socket.gaierror, socket.error) as err:
raise PulseError( 'Failed to resolve socket parameters'
' (address, family) via getaddrinfo: {!r} - {} {}'.format(server, type(err), err) )
sock_af, sock_t, _, _, server = addrinfo[0]
s = socket.socket(sock_af, sock_t)
s.settimeout(socket_timeout)
while True:
ts = c.mono_time()
try: s.connect(server)
except socket.error as err:
if err.errno not in [errno.ECONNREFUSED, errno.ENOENT, errno.ECONNABORTED]: raise
else: break
if n:
n -= 1
if n <= 0: raise PulseError('Number of connection attempts ({}) exceeded'.format(attempts))
if pid_path:
with open(pid_path) as src: os.kill(int(src.read().strip()), signal.SIGUSR2)
time.sleep(max(0, retry_delay - (c.mono_time() - ts)))
if as_file: res = s.makefile('rw', 1)
else: res, s = s, None # to avoid closing this socket
return res
except Exception as err: # CallError, socket.error, IOError (pidfile), OSError (os.kill)
raise PulseError( 'Failed to connect to pulse'
' cli socket {!r}: {} {}'.format(server, type(err), err) )
finally:
if s: s.close()

View File

@ -28,7 +28,6 @@ class Settings(StartCheckMixin, Singleton):
self._PLUGINS_PATH = f"{self._HOME_CONFIG_PATH}/plugins"
self._DEFAULT_ICONS = f"{self._HOME_CONFIG_PATH}/icons"
self._CONFIG_FILE = f"{self._HOME_CONFIG_PATH}/settings.json"
self._GLADE_FILE = f"{self._HOME_CONFIG_PATH}/Main_Window.glade"
self._CSS_FILE = f"{self._HOME_CONFIG_PATH}/stylesheet.css"
self._KEY_BINDINGS_FILE = f"{self._HOME_CONFIG_PATH}/key-bindings.json"
self._PID_FILE = f"{self._HOME_CONFIG_PATH}/{app_name.lower()}.pid"
@ -36,23 +35,6 @@ class Settings(StartCheckMixin, Singleton):
self._CONTEXT_MENU = f"{self._HOME_CONFIG_PATH}/contexct_menu.json"
self._WINDOW_ICON = f"{self._DEFAULT_ICONS}/{app_name.lower()}.png"
# self._USR_CONFIG_FILE = f"{self._USR_PATH}/settings.json"
# self._PLUGINS_PATH = f"plugins"
# self._CONFIG_FILE = f"settings.json"
# self._GLADE_FILE = f"Main_Window.glade"
# self._CSS_FILE = f"stylesheet.css"
# self._KEY_BINDINGS_FILE = f"key-bindings.json"
# self._PID_FILE = f"{app_name.lower()}.pid"
# self._WINDOW_ICON = f"{app_name.lower()}.png"
# self._UI_WIDEGTS_PATH = f"ui_widgets"
# self._CONTEXT_MENU = f"contexct_menu.json"
# self._DEFAULT_ICONS = f"icons"
# with zipfile.ZipFile("files.zip", mode="r", allowZip64=True) as zf:
# with io.TextIOWrapper(zf.open("text1.txt"), encoding="utf-8") as f:
if not os.path.exists(self._HOME_CONFIG_PATH):
os.mkdir(self._HOME_CONFIG_PATH)
if not os.path.exists(self._PLUGINS_PATH):
@ -69,10 +51,6 @@ class Settings(StartCheckMixin, Singleton):
self._DEFAULT_ICONS = f"{self._USR_PATH}/icons"
if not os.path.exists(self._DEFAULT_ICONS):
raise MissingConfigError("Unable to find the application icons directory.")
if not os.path.exists(self._GLADE_FILE):
self._GLADE_FILE = f"{self._USR_PATH}/Main_Window.glade"
if not os.path.exists(self._GLADE_FILE):
raise MissingConfigError("Unable to find the application Glade file.")
if not os.path.exists(self._KEY_BINDINGS_FILE):
self._KEY_BINDINGS_FILE = f"{self._USR_PATH}/key-bindings.json"
if not os.path.exists(self._KEY_BINDINGS_FILE):
@ -154,7 +132,6 @@ class Settings(StartCheckMixin, Singleton):
def get_main_window_min_height(self) -> any: return self._main_window_mh
def get_builder(self) -> any: return self._builder
def get_paint_bg_color(self) -> any: return self.PAINT_BG_COLOR
def get_glade_file(self) -> str: return self._GLADE_FILE
def get_ui_widgets_path(self) -> str: return self._UI_WIDEGTS_PATH
def get_context_menu_data(self) -> str: return self._context_menu_data

View File

@ -41,6 +41,7 @@ class StartCheckMixin:
def _write_new_pid(self):
pid = os.getpid()
self._write_pid(pid)
print(f"{app_name} PID: {pid}")
def _clean_pid(self):
os.unlink(self._PID_FILE)

View File

@ -17,13 +17,7 @@ function main() {
path="${1}"
fi
# NOTE: Remove if you want to pass file(s) besides directories...
if [ ! -d "${path}" ]; then
echo "<change_me>: Path given not a directory..."
exit 1
fi
cd "/opt/"
python /opt/<change_me>.zip "$@"
python /opt/pulstar.zip
}
main "$@";

View File

@ -1,11 +0,0 @@
[Desktop Entry]
Name=<change_me>
GenericName=<change_me>
Comment=<change_me>
Exec=/bin/<change_me> %F
Icon=/usr/share/<change_me>/icons/<change_me>.png
Type=Application
StartupNotify=true
Categories=System;FileTools;Utility;Core;GTK;FileManager;
MimeType=
Terminal=false

View File

@ -0,0 +1,10 @@
[Desktop Entry]
Name=Pulstar
GenericName=Volume Controlls
Comment=Pulstar is a Python + Gtk app to control the volume levels
Exec=/bin/pulstar
Icon=/usr/share/pulstar/icons/pulstar.png
Type=Application
StartupNotify=true
Categories=AudioVideo;Audio;Mixer;GTK;Settings;X-XFCE-SettingsDialog;X-XFCE-HardwareSettings;
Terminal=false

View File

@ -1,28 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- Generated with glade 3.40.0 -->
<interface>
<requires lib="gtk+" version="3.20"/>
<object class="GtkBox" id="glade_box">
<property name="visible">True</property>
<property name="can-focus">False</property>
<property name="orientation">vertical</property>
<child>
<object class="GtkLabel" id="glade_label">
<property name="visible">True</property>
<property name="can-focus">False</property>
<property name="label" translatable="yes">Loaded Me From Glade!</property>
</object>
<packing>
<property name="expand">False</property>
<property name="fill">True</property>
<property name="position">0</property>
</packing>
</child>
<child>
<placeholder/>
</child>
<child>
<placeholder/>
</child>
</object>
</interface>

Binary file not shown.

Before

Width:  |  Height:  |  Size: 12 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 21 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 1.6 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 850 B

Binary file not shown.

Before

Width:  |  Height:  |  Size: 702 B

Binary file not shown.

Before

Width:  |  Height:  |  Size: 6.4 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 925 B

Binary file not shown.

Before

Width:  |  Height:  |  Size: 882 B

Binary file not shown.

Before

Width:  |  Height:  |  Size: 707 B

Binary file not shown.

Before

Width:  |  Height:  |  Size: 798 B

Binary file not shown.

Before

Width:  |  Height:  |  Size: 989 B

Binary file not shown.

Before

Width:  |  Height:  |  Size: 1.8 KiB

View File

Before

Width:  |  Height:  |  Size: 1.5 KiB

After

Width:  |  Height:  |  Size: 1.5 KiB

View File

Before

Width:  |  Height:  |  Size: 858 B

After

Width:  |  Height:  |  Size: 858 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 17 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 10 KiB

View File

Before

Width:  |  Height:  |  Size: 1.3 KiB

After

Width:  |  Height:  |  Size: 1.3 KiB

View File

@ -35,7 +35,7 @@
"error_color":"#ff0000"
},
"debugging": {
"ch_log_lvl": 10,
"ch_log_lvl": 20,
"fh_log_lvl": 20
}
}