4
4
NetworkDeviceWireless ,
5
5
IPv4Config ,
6
6
)
7
- from share .decoratos import call_registered_functions
8
7
from sdbus import sd_bus_open_system
9
8
from threading import Thread
10
- from time import sleep
9
+ from time import sleep , time
11
10
from enum import Enum
11
+ from share .decoratos import call_registered_functions
12
12
13
13
14
14
class InterfaceType (Enum ):
15
- WIFI = 1
16
- ETHERNET = 2
15
+ ETHERNET = 1
16
+ WIFI = 2
17
17
18
18
19
19
class InterfaceIP4Config :
@@ -22,7 +22,7 @@ def __init__(self) -> None:
22
22
self .gateway : list [str ] = []
23
23
24
24
def __repr__ (self ) -> str :
25
- return "" .join (self .ip ).join (self .gateway )
25
+ return f"IP: { ', ' .join (self .ip )} , Gateway: { ', ' .join (self .gateway )} "
26
26
27
27
28
28
class Interface :
@@ -32,70 +32,108 @@ def __init__(self, type: InterfaceType, name: str) -> None:
32
32
self .name = name
33
33
34
34
def __repr__ (self ) -> str :
35
- return "" . join ( str ( self .type .value )). join ( str ( self .ip4 ))
35
+ return f"Type: { self .type .name } , Name: { self . name } , { self .ip4 } "
36
36
37
37
38
38
class EthernetInterface (Interface ):
39
- pass
39
+ def __init__ (self , name : str ) -> None :
40
+ super ().__init__ (InterfaceType .ETHERNET , name )
40
41
41
42
42
43
class WifiInterface (Interface ):
43
- def __init__ (self , type : InterfaceType , name : str ) -> None :
44
- super ().__init__ (type , name )
44
+ def __init__ (self , name : str ) -> None :
45
+ super ().__init__ (InterfaceType . WIFI , name )
45
46
self .ssid : str = ""
46
47
48
+ def __repr__ (self ) -> str :
49
+ return f"SSID: { self .ssid } , { super ().__repr__ ()} "
50
+
47
51
48
52
class NetworkObject :
49
53
def __init__ (self ) -> None :
50
54
self .Interfaces : list [Interface ] = []
51
55
52
56
def PushObject (self , obj : Interface ) -> None :
53
- for int in self .Interfaces :
54
- if str (int ) == str (obj ):
57
+ for intf in self .Interfaces :
58
+ if str (intf ) == str (obj ):
55
59
return
56
60
self .Interfaces .append (obj )
57
61
58
62
def pushEthernetObject (self , interfacename : str , ip4s : list [str ], gw : list [str ]):
59
- pass
63
+ eth_int = EthernetInterface (interfacename )
64
+ eth_int .ip4 .ip = ip4s
65
+ eth_int .ip4 .gateway = gw
66
+ self .PushObject (eth_int )
60
67
61
68
def pushWifiObject (
62
69
self , interfacename : str , ssid : str , ip4s : list [str ], gw : list [str ]
63
70
):
64
- pass
71
+ wifi_int = WifiInterface (interfacename )
72
+ wifi_int .ssid = ssid
73
+ wifi_int .ip4 .ip = ip4s
74
+ wifi_int .ip4 .gateway = gw
75
+ self .PushObject (wifi_int )
65
76
66
77
67
78
class NetworkService (Thread ):
68
79
def __init__ (self ):
69
80
super ().__init__ ()
70
81
self .Network : NetworkObject = NetworkObject ()
82
+ self .last_network_state = None
83
+ self .last_call_time = time ()
84
+
85
+ def has_state_changed (self ):
86
+ current_state = str (self .Network .Interfaces )
87
+ if self .last_network_state != current_state :
88
+ self .last_network_state = current_state
89
+ return True
90
+ return False
71
91
72
92
def run (self ) -> None :
73
93
system_bus = sd_bus_open_system () # We need system bus
74
94
nm = NetworkManager (system_bus )
75
95
while True :
76
96
try :
77
97
devices_paths = nm .get_devices ()
78
- new_ips = []
98
+ self . Network . Interfaces = [] # Reset the network interfaces list
79
99
for device_path in devices_paths :
80
100
generic_device = NetworkDeviceGeneric (device_path , system_bus )
81
- # print(generic_device.device_type, generic_device.interface)
82
- if generic_device .device_type == 2 :
83
- pass
84
- # print(NetworkDeviceWireless(device_path, system_bus).get_applied_connection()[0]['connection'])
85
101
device_ip4_conf_path = generic_device .ip4_config
86
102
if device_ip4_conf_path == "/" :
87
103
continue
88
104
else :
89
105
ip4_conf = IPv4Config (device_ip4_conf_path , system_bus )
90
- for address_data in ip4_conf .address_data :
91
- # print(' Ip Adress:', address_data['address'][1])
92
- new_ips .append (address_data ["address" ][1 ])
93
- self .ip4s = new_ips
94
- call_registered_functions ("updateip" , None )
106
+ ip_addresses = [
107
+ address_data ["address" ][1 ]
108
+ for address_data in ip4_conf .address_data
109
+ ]
110
+ gw = str (ip4_conf .gateway ) if ip4_conf .gateway else "0.0.0.0"
111
+ gateways : list [str ] = [gw ]
112
+ if generic_device .device_type == InterfaceType .ETHERNET .value :
113
+ self .Network .pushEthernetObject (
114
+ generic_device .interface , ip_addresses , gateways
115
+ )
116
+ elif generic_device .device_type == InterfaceType .WIFI .value :
117
+ wifi_device = NetworkDeviceWireless (device_path , system_bus )
118
+ self .Network .pushWifiObject (
119
+ generic_device .interface ,
120
+ "" ,
121
+ ip_addresses ,
122
+ gateways ,
123
+ )
124
+
125
+ current_time = time ()
126
+ if (
127
+ self .has_state_changed ()
128
+ or (current_time - self .last_call_time ) >= 10
129
+ ):
130
+ call_registered_functions ("updateip" , None )
131
+ self .last_call_time = current_time
95
132
96
133
sleep (1 )
97
134
except Exception as e :
98
135
print (e )
136
+ sleep (1 )
99
137
100
138
101
139
ns : NetworkService | None = None
0 commit comments