-
-
Notifications
You must be signed in to change notification settings - Fork 18
Update upslite_plugin_1_3.py #7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
milanojs
wants to merge
3
commits into
itsdarklikehell:master
Choose a base branch
from
milanojs:patch-1
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,114 +1,195 @@ | ||
| # Based on UPS Lite v1.1 from https://github.com/xenDE | ||
| # Made specifically to address the problems caused by the hardware changes in 1.3. Oh yeah I also removed the auto-shutdown feature because it's kind of broken. | ||
| # | ||
| # To setup, see page six of this manual to see how to enable i2c: | ||
| # https://github.com/linshuqin329/UPS-Lite/blob/master/UPS-Lite_V1.3_CW2015/Instructions%20for%20UPS-Lite%20V1.3.pdf | ||
| # | ||
| # Follow page seven, install the dependencies (python-smbus) and copy this script over for later use: | ||
| # https://github.com/linshuqin329/UPS-Lite/blob/master/UPS-Lite_V1.3_CW2015/UPS_Lite_V1.3_CW2015.py | ||
| # | ||
| # Now, install this plugin by copying this to the 'available-plugins' folder in your pwnagotchi, install and enable the plugin with the commands: | ||
| # sudo pwnagotchi plugins install upslite_plugin_1_3 | ||
| # sudo pwnagotchi plugins enable upslite_plugin_1_3 | ||
| # | ||
| # Now restart raspberry pi. Once back up ensure upslite_plugin_1_3 plugin is turned on in the WebUI. If there is still '0%' on your battery meter | ||
| # run the script we saved earlier and ensure that the pwnagotchi is plugged in both at the battery and the raspberry pi. The script should start trying to | ||
| # read the battery, and should be successful once there's a USB cable running power to the battery supply. | ||
|
|
||
| import logging | ||
| import struct | ||
|
|
||
| import RPi.GPIO as GPIO | ||
| import sys | ||
| import time | ||
| # Import wraps for the decorator | ||
| from functools import wraps | ||
|
|
||
| sys.path.append('/usr/local/share/pwnagotchi') # May not be needed | ||
| import pwnagotchi | ||
| import pwnagotchi.plugins as plugins | ||
| import pwnagotchi.ui.fonts as fonts | ||
| from pwnagotchi.ui.components import LabeledValue | ||
| from pwnagotchi.ui.view import BLACK | ||
|
|
||
| CW2015_ADDRESS = 0x62 | ||
| CW2015_REG_VCELL = 0x02 | ||
| CW2015_REG_SOC = 0x04 | ||
| CW2015_REG_MODE = 0x0A | ||
|
|
||
| try: | ||
| import smbus | ||
| except ImportError: | ||
| logging.error("UPSLite plugin requires smbus. Run 'sudo apt install python3-smbus'") | ||
| smbus = None | ||
|
|
||
| CW2015_ADDRESS = 0X62 | ||
| CW2015_REG_VCELL = 0X02 | ||
| CW2015_REG_SOC = 0X04 | ||
| CW2015_REG_MODE = 0X0A | ||
| CW2015_QUICKSTART_VAL = 0x30 | ||
| GPIO_PIN_CHARGING = 4 | ||
|
|
||
| log = logging.getLogger('pwnagotchi_plugins') | ||
|
|
||
| def handle_errors(log_func, default_value): | ||
| """ | ||
| Decorator factory to handle common errors (I2C, General) in UPS methods, | ||
| log them, and return a default value. | ||
| """ | ||
| def decorator(func): | ||
| @wraps(func) | ||
| def wrapper(self, *args, **kwargs): # Added 'self' to work with instance methods | ||
| try: | ||
| return func(self, *args, **kwargs) # Pass 'self' to the wrapped method | ||
| except IOError as e: | ||
| # Log I2C errors - use the provided log function (e.g., log.debug or log.error) | ||
| log_func("UPSLite: I2C Error in %s: %s", func.__name__, e) | ||
| return default_value | ||
| except Exception as e: | ||
| # Log other unexpected errors | ||
| log.error("UPSLite: Unexpected error in %s: %s", func.__name__, e, exc_info=True) # Use log.error and add traceback | ||
| return default_value | ||
| return wrapper | ||
| return decorator | ||
|
|
||
| # TODO: add enable switch in config.yml an cleanup all to the best place | ||
| class UPS: | ||
| def __init__(self): | ||
| # only import when the module is loaded and enabled | ||
| import smbus | ||
|
|
||
| # 0 = /dev/i2c-0 (port I2C0), 1 = /dev/i2c-1 (port I2C1) | ||
| if smbus is None: | ||
| raise ImportError("smbus library not found for UPS class") | ||
| self._bus = smbus.SMBus(1) | ||
| self._quickstart() # Initialize the chip ONCE here | ||
|
|
||
| def voltage(self): | ||
| # Setup GPIO - do it once here | ||
| try: | ||
| read = self._bus.read_word_data(CW2015_ADDRESS, CW2015_REG_VCELL) | ||
| swapped = struct.unpack("<H", struct.pack(">H", read))[0] | ||
| return swapped * 1.25 / 1000 / 16 | ||
| except: | ||
| return 0.0 | ||
|
|
||
| def capacity(self): | ||
| GPIO.setmode(GPIO.BCM) | ||
| GPIO.setwarnings(False) | ||
| GPIO.setup(GPIO_PIN_CHARGING, GPIO.IN) | ||
| log.debug("UPSLite: GPIO %d setup OK.", GPIO_PIN_CHARGING) | ||
| except Exception as e: | ||
| # Let this error propagate during init, or handle differently if needed | ||
| log.error("UPSLite: Error setting up GPIO %d: %s", GPIO_PIN_CHARGING, e, exc_info=True) | ||
| # Consider if the plugin should load if GPIO fails - maybe it should? | ||
|
|
||
| # _quickstart still needs its own specific error handling as it doesn't return a value | ||
| def _quickstart(self): | ||
| """Wakes up the CW2015 and triggers calculations.""" | ||
| try: | ||
| address = 0x36 | ||
| read = self._bus.read_word_data(CW2015_ADDRESS, CW2015_REG_SOC) | ||
| swapped = struct.unpack("<H", struct.pack(">H", read))[0] | ||
| return swapped / 256 | ||
| except: | ||
| return 0.0 | ||
|
|
||
| self._bus.write_byte_data(CW2015_ADDRESS, CW2015_REG_MODE, CW2015_QUICKSTART_VAL) | ||
| log.info("UPSLite: CW2015 QuickStart command sent (wrote 0x%02X to 0x%02X)", CW2015_QUICKSTART_VAL, CW2015_REG_MODE) | ||
| time.sleep(0.1) | ||
| except IOError as e: | ||
| log.error("UPSLite: I2C Error sending QuickStart command: %s", e) | ||
| except Exception as e: | ||
| log.error("UPSLite: Unexpected error during QuickStart: %s", e, exc_info=True) | ||
|
|
||
| # Apply the decorator to voltage method | ||
| @handle_errors(log.debug, 0.0) # Use log.debug for frequent I2C errors, default 0.0 | ||
| def voltage(self): | ||
| """Reads voltage. Returns voltage in V or 0.0 on error.""" | ||
| # Removed try/except block - decorator handles it | ||
| read = self._bus.read_word_data(CW2015_ADDRESS, CW2015_REG_VCELL) | ||
| swapped = struct.unpack("<H", struct.pack(">H", read))[0] | ||
| voltage = swapped * 0.305 / 1000 # Correct scale factor | ||
| return voltage | ||
|
|
||
| # Apply the decorator to capacity method | ||
| @handle_errors(log.debug, 0.0) # Use log.debug for frequent I2C errors, default 0.0 | ||
| def capacity(self): | ||
| """Reads capacity percentage. Returns % (0-100) or 0.0 on error.""" | ||
| # Removed try/except block - decorator handles it | ||
| read = self._bus.read_word_data(CW2015_ADDRESS, CW2015_REG_SOC) | ||
| swapped = struct.unpack("<H", struct.pack(">H", read))[0] | ||
| capacity = swapped / 256.0 # Correct calculation | ||
| return max(0.0, min(100.0, capacity)) # Clamp value | ||
|
|
||
| # Apply the decorator to charging method | ||
| @handle_errors(log.error, '?') # Use log.error for GPIO errors, default '?' | ||
| def charging(self): | ||
| try: | ||
| GPIO.setmode(GPIO.BCM) | ||
| GPIO.setup(4, GPIO.IN) | ||
| return "+" if GPIO.input(4) == GPIO.HIGH else "-" | ||
| except: | ||
| return "-" | ||
| """Checks charging status via GPIO. Returns '+' (charging), '-' (discharging), or '?' (error).""" | ||
| # Removed try/except block - decorator handles it | ||
| # Note: This decorator won't catch the initial GPIO setup error in __init__ | ||
| return '+' if GPIO.input(GPIO_PIN_CHARGING) == GPIO.HIGH else '-' | ||
|
|
||
|
|
||
| # The UPSLite plugin class remains the same as the previous version | ||
| class UPSLite(plugins.Plugin): | ||
| __GitHub__ = "" | ||
| __author__ = "(edited by: itsdarklikehell bauke.molenaar@gmail.com), evilsocket@gmail.com" | ||
| __version__ = "1.0.0" | ||
| __license__ = "GPL3" | ||
| __description__ = "A plugin that will add a voltage indicator for the UPS Lite v1.3" | ||
| __name__ = "UPSLite" | ||
| __help__ = "A plugin that will add a voltage indicator for the UPS Lite v1.1" | ||
| __dependencies__ = { | ||
| "pip": ["scapy"], | ||
| } | ||
| __author__ = 'evilsocket@gmail.com, LouD, dlmd, Juan_milano@hotmail.com' | ||
| __version__ = '1.0.4' # Incremented version for decorator refactoring | ||
| __license__ = 'GPL3' | ||
| __description__ = 'A plugin that displays battery capacity/charging for UPS Lite v1.3 (CW2015) and supports auto-shutdown with improved error handling via decorator.' | ||
| __defaults__ = { | ||
| "enabled": False, | ||
| 'enabled': False, | ||
| 'shutdown': 5, | ||
| } | ||
|
|
||
| # ... (__init__, on_loaded, on_ui_setup, on_unload methods remain the same | ||
| # as the previous version with the outer try/except in on_ui_update) ... | ||
| def __init__(self): | ||
| self.ups = None | ||
| self.shutdown_threshold = 5 | ||
| log.debug("UPSLite plugin __init__") | ||
|
|
||
| def on_loaded(self): | ||
| self.ups = UPS() | ||
| log.info("UPSLite plugin loaded") | ||
| try: | ||
| cfg_shutdown = self.options.get('shutdown', self.__defaults__['shutdown']) | ||
| if isinstance(cfg_shutdown, int) and 0 <= cfg_shutdown <= 100: | ||
| self.shutdown_threshold = cfg_shutdown | ||
| else: | ||
| log.warning("UPSLite: Invalid shutdown value in config (%s), using default %d%%", cfg_shutdown, self.__defaults__['shutdown']) | ||
| self.shutdown_threshold = self.__defaults__['shutdown'] | ||
|
|
||
| self.ups = UPS() | ||
| log.info("UPSLite: UPS object initialized successfully. Shutdown threshold: %d%%", self.shutdown_threshold) | ||
| except Exception as e: | ||
| log.error("UPSLite: Failed to initialize UPS object: %s", e, exc_info=True) | ||
| self.ups = None | ||
|
|
||
| def on_ui_setup(self, ui): | ||
| ui.add_element( | ||
| "ups", | ||
| LabeledValue( | ||
| color=BLACK, | ||
| label="UPS", | ||
| value="0%", | ||
| position=(ui.width() / 2 + 15, 0), | ||
| label_font=fonts.Bold, | ||
| text_font=fonts.Medium, | ||
| ), | ||
| ) | ||
| try: | ||
| log.debug("UPSLite: Setting up UI element 'ups'") | ||
| ui.add_element('ups', LabeledValue(color=BLACK, label='UPS', value='--', position=(ui.width() // 2 + 15, 0), | ||
| label_font=fonts.Bold, text_font=fonts.Medium)) | ||
| except Exception as e: | ||
| log.error("UPSLite: Error setting up UI: %s", e, exc_info=True) | ||
|
|
||
| def on_unload(self, ui): | ||
| with ui._lock: | ||
| ui.remove_element("ups") | ||
|
|
||
| try: | ||
| log.info("UPSLite plugin unloaded") | ||
| with ui._lock: | ||
| if 'ups' in ui._state._state: | ||
| ui.remove_element('ups') | ||
| except Exception as e: | ||
| log.error("UPSLite: Error during unload: %s", e, exc_info=True) | ||
|
|
||
| # on_ui_update still benefits from its outer try/except block | ||
| def on_ui_update(self, ui): | ||
| capacity = self.ups.capacity() | ||
| charging = self.ups.charging() | ||
| ui.set("ups", "%2i%s" % (capacity, charging)) | ||
|
|
||
| def on_webhook(self, path, request): | ||
| logging.info(f"[{self.__class__.__name__}] webhook pressed") | ||
| try: # Outer block for overall method safety | ||
| if self.ups: | ||
| # No inner try/except needed here now, decorator handles method errors | ||
| capacity = self.ups.capacity() # Will return 0.0 on error | ||
| charging = self.ups.charging() # Will return '?' on error | ||
| capacity_int = int(round(capacity)) | ||
|
|
||
| # Handle potential '?' from charging error if needed | ||
| display_charging = charging if charging in ['+', '-'] else '-' # Default to '-' if error '?' | ||
|
|
||
| ui.set('ups', "%2i%%%s" % (capacity_int, display_charging)) | ||
|
|
||
| # Check for shutdown condition ONLY if not charging (and no charging error) | ||
| if charging == '-': # Only shutdown if definitively discharging | ||
| if capacity_int <= self.shutdown_threshold: | ||
| log.warning('[UPSLite] Low battery (%.1f%% <= %d%%) and not charging: shutting down!', capacity, self.shutdown_threshold) | ||
| try: | ||
| ui.set('ups', "%2i%%%s" % (capacity_int, display_charging)) | ||
| ui.update(force=True, new_data={'status': 'Battery low (%d%%), shutting down...' % capacity_int}) | ||
| time.sleep(2) | ||
| except Exception as ui_e: | ||
| log.error("UPSLite: Failed to update UI before shutdown: %s", ui_e) | ||
| pwnagotchi.shutdown() | ||
|
|
||
| else: # self.ups is None | ||
| try: | ||
| ui.set('ups', "--") | ||
| except Exception as e_ui_else: | ||
| log.error("UPSLite: Failed to set UI to default '--' state: %s", e_ui_else) | ||
|
|
||
| except Exception as e_outer: | ||
| log.error("UPSLite: Unhandled exception in on_ui_update: %s", e_outer, exc_info=True) | ||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.