Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 20 additions & 26 deletions killerbee/dev_sewio.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,7 @@
import time # type: ignore
import struct # type: ignore
import time # type: ignore
try:
import urllib.request as urllib2 # type: ignore
except ImportError:
import urllib2 # type: ignore
import urllib3 # type: ignore
import re # type: ignore
from socket import socket, AF_INET, SOCK_DGRAM, SOL_SOCKET, SO_REUSEADDR, timeout as error_timeout # type: ignore
from struct import unpack # type: ignore
Expand All @@ -27,23 +24,23 @@
DEFAULT_IP = "10.10.10.2" #IP address of the sniffer
DEFAULT_GW = "10.10.10.1" #IP address of the default gateway
DEFAULT_UDP = 17754 #"Remote UDP Port"
TESTED_FW_VERS = ["0.5", "0.9"] #Firmware versions tested with the current version of this client device connector
TESTED_FW_VERS = ["0.5", "0.9", "0.9.0"] #Firmware versions tested with the current version of this client device connector

NTP_DELTA = 70*365*24*60*60 #datetime(1970, 1, 1, 0, 0, 0) - datetime(1900, 1, 1, 0, 0, 0)

'''
Convert the two parts of an NTP timestamp to a datetime object.
Similar code from Wireshark source:
575 /* NTP_BASETIME is in fact epoch - ntp_start_time */
576 #define NTP_BASETIME 2208988800ul
619 void
620 ntp_to_nstime(tvbuff_t *tvb, gint offset, nstime_t *nstime)
621 {
622 nstime->secs = tvb_get_ntohl(tvb, offset);
623 if (nstime->secs)
624 nstime->secs -= NTP_BASETIME;
625 nstime->nsecs = (int)(tvb_get_ntohl(tvb, offset+4)/(NTP_FLOAT_DENOM/1000000000.0));
626 }
575 /* NTP_BASETIME is in fact epoch - ntp_start_time */
576 #define NTP_BASETIME 2208988800ul
619 void
620 ntp_to_nstime(tvbuff_t *tvb, gint offset, nstime_t *nstime)
621 {
622 nstime->secs = tvb_get_ntohl(tvb, offset);
623 if (nstime->secs)
624 nstime->secs -= NTP_BASETIME;
625 nstime->nsecs = (int)(tvb_get_ntohl(tvb, offset+4)/(NTP_FLOAT_DENOM/1000000000.0));
626 }
'''
def ntp_to_system_time(secs, msecs):
"""convert a NTP time to system time"""
Expand All @@ -53,10 +50,9 @@ def ntp_to_system_time(secs, msecs):

def getFirmwareVersion(ip):
try:
fw = re.search(r'Firmware version ([0-9.]+)', html.read())
#TODO: Have timeout handled sooner
html = urllib2.urlopen("http://{0}/".format(ip), timeout=1)
data = html.read()
html = urllib3.request("GET", "http://{0}/".format(ip))
data = html.data.decode("utf-8")
# First try for the "old" web UI parsing:
fw = re.search(r'Firmware version ([0-9.]+)', data)
if fw is not None:
Expand All @@ -83,10 +79,10 @@ def getMacAddr(ip):
Returns a string for the MAC address of the sniffer.
'''
try:
html = urllib.request.urlopen("http://{0}/".format(ip))
html = urllib3.request("GET", "http://{0}/".format(ip))
# Yup, we're going to have to steal the status out of a JavaScript variable
#var values = removeSSItag('<!--#pindex-->STOPPED,00:1a:b6:00:0a:a4,...
res = re.search(r'<!--#pindex-->[A-Z]+,((?:[0-9a-f]{2}:){5}[0-9a-f]{2})', html.read())
res = re.search(r'<!--#pindex-->[A-Z]+,((?:[0-9a-f]{2}:){5}[0-9a-f]{2})', html.data.decode("utf-8"))
if res is None:
raise KBInterfaceError("Unable to parse the sniffer's MAC address.")
return res.group(1)
Expand Down Expand Up @@ -182,11 +178,11 @@ def __make_rest_call(self, path, fetch=True):
returns True if an HTTP 200 code was received.
'''
try:
html = urllib.request.urlopen("http://{0}/{1}".format(self.dev, path))
html = urllib3.request("GET", "http://{0}/{1}".format(self.dev, path))
if fetch:
return html.read()
return html.data.decode("utf-8")
else:
return (html.getcode() == 200)
return (html.status == 200)
except Exception as e:
raise KBInterfaceError("Unable to preform a call to {0}/{1} (error: {2}).".format(self.dev, path, e))

Expand Down Expand Up @@ -349,8 +345,7 @@ def inject(self, packet, channel=None, count=1, delay=0, page=0):
self.set_channel(channel)

packet_length = len(packet)
packet = packet.encode('latin-1')

packet = packet.decode('latin-1')
self.__make_rest_call(
"inject.cgi?chn={0}&modul=0&txlevel=0&rxen=1&nrepeat={1}&tspace={2}&autocrc=1&spayload={3}&len={4}".format(
self._channel, count, delay, packet, packet_length
Expand Down Expand Up @@ -516,4 +511,3 @@ def jammer_off(self):

if not self.__make_rest_call("status.cgi?p=4"):
raise KBInterfaceError("Error instructing sniffer to stop jamming.")