from __future__ import print_function import os import sys from contextlib import contextmanager import importlib import signal import site import traceback import weakref from _hexchat_embedded import ffi, lib if not hasattr(sys, 'argv'): sys.argv = [''] VERSION = b'2.0' # Sync with hexchat.__version__ PLUGIN_NAME = ffi.new('char[]', b'Python') PLUGIN_DESC = ffi.new('char[]', b'Python %d.%d scripting interface' % (sys.version_info[0], sys.version_info[1])) PLUGIN_VERSION = ffi.new('char[]', VERSION) hexchat = None local_interp = None hexchat_stdout = None plugins = set() @contextmanager def redirected_stdout(): sys.stdout = sys.__stdout__ sys.stderr = sys.__stderr__ yield sys.stdout = hexchat_stdout sys.stderr = hexchat_stdout if os.environ.get('HEXCHAT_LOG_PYTHON'): def log(*args): with redirected_stdout(): print(*args) else: def log(*args): pass class Stdout: def __init__(self): self.buffer = bytearray() def write(self, string): string = string.encode() idx = string.rfind(b'\n') if idx is not -1: self.buffer += string[:idx] lib.hexchat_print(lib.ph, bytes(self.buffer)) self.buffer = bytearray(string[idx + 1:]) else: self.buffer += string def isatty(self): # FIXME: help() locks app despite this? return False class Attribute: def __init__(self): self.time = 0 def __repr__(self): return ''.format(id(self)) class Hook: def __init__(self, plugin, callback, userdata, is_unload): self.is_unload = is_unload self.plugin = weakref.proxy(plugin) self.callback = callback self.userdata = userdata self.hexchat_hook = None self.handle = ffi.new_handle(weakref.proxy(self)) def __del__(self): log('Removing hook', id(self)) if self.is_unload is False: assert self.hexchat_hook is not None lib.hexchat_unhook(lib.ph, self.hexchat_hook) if sys.version_info[0] is 2: def compile_file(data, filename): return compile(data, filename, 'exec', dont_inherit=True) def compile_line(string): try: return compile(string, '', 'eval', dont_inherit=True) except SyntaxError: # For some reason `print` is invalid for eval # This will hide any return value though return compile(string, '', 'exec', dont_inherit=True) else: def compile_file(data, filename): return compile(data, filename, 'exec', optimize=2, dont_inherit=True) def compile_line(string): # newline appended to solve unexpected EOF issues return compile(string + '\n', '', 'single', optimize=2, dont_inherit=True) class Plugin: def __init__(self): self.ph = None self.name = '' self.filename = '' self.version = '' self.description = '' self.hooks = set() self.globals = { '__plugin': weakref.proxy(self), '__name__': '__main__', } def add_hook(self, callback, userdata, is_unload=False): hook = Hook(self, callback, userdata, is_unload=is_unload) self.hooks.add(hook) return hook def remove_hook(self, hook): for h in self.hooks: if id(h) == hook: ud = h.userdata self.hooks.remove(h) return ud else: log('Hook not found') def loadfile(self, filename): try: self.filename = filename with open(filename) as f: data = f.read() compiled = compile_file(data, filename) exec(compiled, self.globals) try: self.name = self.globals['__module_name__'] except KeyError: lib.hexchat_print(lib.ph, b'Failed to load module: __module_name__ must be set') return False self.version = self.globals.get('__module_version__', '') self.description = self.globals.get('__module_description__', '') self.ph = lib.hexchat_plugingui_add(lib.ph, filename.encode(), self.name.encode(), self.description.encode(), self.version.encode(), ffi.NULL) except Exception as e: lib.hexchat_print(lib.ph, 'Failed to load module: {}'.format(e).encode()) traceback.print_exc() return False return True def __del__(self): log('unloading', self.filename) for hook in self.hooks: if hook.is_unload is True: try: hook.callback(hook.userdata) except Exception as e: log('Failed to run hook:', e) traceback.print_exc() del self.hooks if self.ph is not None: lib.hexchat_plugingui_remove(lib.ph, self.ph) if sys.version_info[0] is 2: def __decode(string): return string else: def __decode(string): return string.decode() # There can be empty entries between non-empty ones so find the actual last value def wordlist_len(words): for i in range(31, 1, -1): if ffi.string(words[i]): return i return 0 def create_wordlist(words): size = wordlist_len(words) return [__decode(ffi.string(words[i])) for i in range(1, size + 1)] # This function only exists for compat reasons with the C plugin # It turns the word list from print hooks into a word_eol list # This makes no sense to do... def create_wordeollist(words): words = reversed(words) last = None accum = None ret = [] for word in words: if accum is None: accum = word elif word: last = accum accum = ' '.join((word, last)) ret.insert(0, accum) return ret def to_cb_ret(value): if value is None: return 0 else: return int(value) @ffi.def_extern() def _on_command_hook(word, word_eol, userdata): hook = ffi.from_handle(userdata) word = create_wordlist(word) word_eol = create_wordlist(word_eol) return to_cb_ret(hook.callback(word, word_eol, hook.userdata)) @ffi.def_extern() def _on_print_hook(word, userdata): hook = ffi.from_handle(userdata) word = create_wordlist(word) word_eol = create_wordeollist(word) return to_cb_ret(hook.callback(word, word_eol, hook.userdata)) @ffi.def_extern() def _on_print_attrs_hook(word, attrs, userdata): hook = ffi.from_handle(userdata) word = create_wordlist(word) word_eol = create_wordeollist(word) attr = Attribute() attr.time = attrs.server_time_utc return to_cb_ret(hook.callback(word, word_eol, hook.userdata, attr)) @ffi.def_extern() def _on_server_hook(word, word_eol, userdata): hook = ffi.from_handle(userdata) word = create_wordlist(word) word_eol = create_wordlist(word_eol) return to_cb_ret(hook.callback(word, word_eol, hook.userdata)) @ffi.def_extern() def _on_server_attrs_hook(word, word_eol, attrs, userdata): hook = ffi.from_handle(userdata) word = create_wordlist(word) word_eol = create_wordlist(word_eol) attr = Attribute() attr.time = attrs.server_time_utc return to_cb_ret(hook.callback(word, word_eol, hook.userdata, attr)) @ffi.def_extern() def _on_timer_hook(userdata): hook = ffi.from_handle(userdata) if hook.callback(hook.userdata) is True: return 1 else: hook.is_unload = True # Don't unhook for h in hook.plugin.hooks: if h == hook: hook.plugin.hooks.remove(h) break return 0 @ffi.def_extern(error=3) def _on_say_command(word, word_eol, userdata): channel = ffi.string(lib.hexchat_get_info(lib.ph, b'channel')) if channel == b'>>python<<': python = ffi.string(word_eol[1]) lib.hexchat_print(lib.ph, b'>>> ' + python) exec_in_interp(__decode(python)) return 1 return 0 def load_filename(filename): filename = os.path.expanduser(filename) if not os.path.isabs(filename): configdir = __decode(ffi.string(lib.hexchat_get_info(lib.ph, b'configdir'))) filename = os.path.join(configdir, 'addons', filename) if filename and not any(plugin.filename == filename for plugin in plugins): plugin = Plugin() if plugin.loadfile(filename): plugins.add(plugin) return True return False def unload_name(name): if name: for plugin in plugins: if name in (plugin.name, plugin.filename, os.path.basename(plugin.filename)): plugins.remove(plugin) return True return False def reload_name(name): if name: for plugin in plugins: if name in (plugin.name, plugin.filename, os.path.basename(plugin.filename)): filename = plugin.filename plugins.remove(plugin) return load_filename(filename) return False @contextmanager def change_cwd(path): old_cwd = os.getcwd() os.chdir(path) yield os.chdir(old_cwd) def autoload(): configdir = __decode(ffi.string(lib.hexchat_get_info(lib.ph, b'configdir'))) addondir = os.path.join(configdir, 'addons') try: with change_cwd(addondir): # Maintaining old behavior for f in os.listdir(addondir): if f.endswith('.py'): log('Autoloading', f) # TODO: Set cwd load_filename(os.path.join(addondir, f)) except FileNotFoundError as e: log('Autoload failed', e) def list_plugins(): if not plugins: lib.hexchat_print(lib.ph, b'No python modules loaded') return tbl_headers = [b'Name', b'Version', b'Filename', b'Description'] tbl = [ tbl_headers, [(b'-' * len(s)) for s in tbl_headers] ] for plugin in plugins: basename = os.path.basename(plugin.filename).encode() name = plugin.name.encode() version = plugin.version.encode() if plugin.version else b'' description = plugin.description.encode() if plugin.description else b'' tbl.append((name, version, basename, description)) column_sizes = [ max(len(item) for item in column) for column in zip(*tbl) ] for row in tbl: lib.hexchat_print(lib.ph, b' '.join(item.ljust(column_sizes[i]) for i, item in enumerate(row))) lib.hexchat_print(lib.ph, b'') def exec_in_interp(python): global local_interp if not python: return if local_interp is None: local_interp = Plugin() local_interp.locals = {} local_interp.globals['hexchat'] = hexchat code = compile_line(python) try: ret = eval(code, local_interp.globals, local_interp.locals) if ret is not None: lib.hexchat_print(lib.ph, '{}'.format(ret).encode()) except Exception as e: traceback.print_exc(file=hexchat_stdout) @ffi.def_extern() def _on_load_command(word, word_eol, userdata): filename = ffi.string(word[2]) if filename.endswith(b'.py'): load_filename(__decode(filename)) return 3 return 0 @ffi.def_extern() def _on_unload_command(word, word_eol, userdata): filename = ffi.string(word[2]) if filename.endswith(b'.py'): unload_name(__decode(filename)) return 3 return 0 @ffi.def_extern() def _on_reload_command(word, word_eol, userdata): filename = ffi.string(word[2]) if filename.endswith(b'.py'): reload_name(__decode(filename)) return 3 return 0 @ffi.def_extern(error=3) def _on_py_command(word, word_eol, userdata): subcmd = __decode(ffi.string(word[2])).lower() if subcmd == 'exec': python = __decode(ffi.string(word_eol[3])) exec_in_interp(python) elif subcmd == 'load': filename = __decode(ffi.string(word[3])) load_filename(filename) elif subcmd == 'unload': name = __decode(ffi.string(word[3])) if not unload_name(name): lib.hexchat_print(lib.ph, b'Can\'t find a python plugin with that name') elif subcmd == 'reload': name = __decode(ffi.string(word[3])) if not reload_name(name): lib.hexchat_print(lib.ph, b'Can\'t find a python plugin with that name') elif subcmd == 'console': lib.hexchat_command(lib.ph, b'QUERY >>python<<') elif subcmd == 'list': list_plugins() elif subcmd == 'about': lib.hexchat_print(lib.ph, b'HexChat Python interface version ' + VERSION) else: lib.hexchat_command(lib.ph, b'HELP PY') return 3 @ffi.def_extern() def _on_plugin_init(plugin_name, plugin_desc, plugin_version, arg, libdir): global hexchat global hexchat_stdout signal.signal(signal.SIGINT, signal.SIG_DFL) plugin_name[0] = PLUGIN_NAME plugin_desc[0] = PLUGIN_DESC plugin_version[0] = PLUGIN_VERSION try: libdir = __decode(ffi.string(libdir)) modpath = os.path.join(libdir, '..', 'python') sys.path.append(os.path.abspath(modpath)) hexchat = importlib.import_module('hexchat') except (UnicodeDecodeError, ImportError) as e: lib.hexchat_print(lib.ph, b'Failed to import module: ' + repr(e).encode()) return 0 hexchat_stdout = Stdout() sys.stdout = hexchat_stdout sys.stderr = hexchat_stdout lib.hexchat_hook_command(lib.ph, b'', 0, lib._on_say_command, ffi.NULL, ffi.NULL) lib.hexchat_hook_command(lib.ph, b'LOAD', 0, lib._on_load_command, ffi.NULL, ffi.NULL) lib.hexchat_hook_command(lib.ph, b'UNLOAD', 0, lib._on_unload_command, ffi.NULL, ffi.NULL) lib.hexchat_hook_command(lib.ph, b'RELOAD', 0, lib._on_reload_command, ffi.NULL, ffi.NULL) lib.hexchat_hook_command(lib.ph, b'PY', 0, lib._on_py_command, b'''Usage: /PY LOAD UNLOAD RELOAD LIST EXEC CONSOLE ABOUT''', ffi.NULL) lib.hexchat_print(lib.ph, b'Python interface loaded') autoload() return 1 @ffi.def_extern() def _on_plugin_deinit(): global local_interp global hexchat global hexchat_stdout global plugins plugins = set() local_interp = None hexchat = None hexchat_stdout = None sys.stdout = sys.__stdout__ sys.stderr = sys.__stderr__ for mod in ('_hexchat', 'hexchat', 'xchat', '_hexchat_embedded'): try: del sys.modules[mod] except KeyError: pass return 1