parent
57416b6e2c
commit
e19f666331
7 changed files with 72 additions and 73 deletions
43
database.py
43
database.py
|
@ -33,6 +33,9 @@ class Database():
|
|||
# self.conn.create_function("prepare_ip4address", 1,
|
||||
# Database.prepare_ip4address,
|
||||
# deterministic=True)
|
||||
self.conn.create_function("unpack_domain", 1,
|
||||
lambda s: s[:-1][::-1],
|
||||
deterministic=True)
|
||||
|
||||
def execute(self, cmd: str, args: typing.Union[
|
||||
typing.Tuple[DbValue, ...],
|
||||
|
@ -123,6 +126,13 @@ class Database():
|
|||
def prepare_zone(self, zone: str) -> str:
|
||||
return self.prepare_hostname(zone)
|
||||
|
||||
@staticmethod
|
||||
def prepare_asn(asn: str) -> int:
|
||||
asn = asn.upper()
|
||||
if asn.startswith('AS'):
|
||||
asn = asn[2:]
|
||||
return int(asn)
|
||||
|
||||
@staticmethod
|
||||
def prepare_ip4address(address: str) -> int:
|
||||
total = 0
|
||||
|
@ -169,7 +179,7 @@ class Database():
|
|||
|
||||
def export(self, first_party_only: bool = False,
|
||||
end_chain_only: bool = False) -> typing.Iterable[str]:
|
||||
command = 'SELECT val FROM rules ' \
|
||||
command = 'SELECT unpack_domain(val) FROM rules ' \
|
||||
'INNER JOIN hostname ON rules.id = hostname.entry'
|
||||
restrictions: typing.List[str] = list()
|
||||
if first_party_only:
|
||||
|
@ -178,9 +188,10 @@ class Database():
|
|||
restrictions.append('rules.refs = 0')
|
||||
if restrictions:
|
||||
command += ' WHERE ' + ' AND '.join(restrictions)
|
||||
command += ' ORDER BY unpack_domain(val) ASC'
|
||||
self.execute(command)
|
||||
for val, in self.cursor:
|
||||
yield val[:-1][::-1]
|
||||
yield val
|
||||
|
||||
def get_domain(self, domain: str) -> typing.Iterable[int]:
|
||||
self.enter_step('get_domain_prepare')
|
||||
|
@ -235,6 +246,13 @@ class Database():
|
|||
self.enter_step('get_ip4_yield')
|
||||
yield entry
|
||||
|
||||
def list_asn(self) -> typing.Iterable[typing.Tuple[str, int]]:
|
||||
self.enter_step('list_asn_select')
|
||||
self.enter_step('get_domain_select')
|
||||
self.execute('SELECT val, entry FROM asn')
|
||||
for val, entry in self.cursor:
|
||||
yield f'AS{val}', entry
|
||||
|
||||
def _set_generic(self,
|
||||
table: str,
|
||||
select_query: str,
|
||||
|
@ -325,8 +343,29 @@ class Database():
|
|||
*args, **kwargs
|
||||
)
|
||||
|
||||
def set_asn(self, asn: str,
|
||||
*args: typing.Any, **kwargs: typing.Any) -> None:
|
||||
self.enter_step('set_asn_prepare')
|
||||
try:
|
||||
asn_prep = self.prepare_asn(asn)
|
||||
except ValueError:
|
||||
self.log.error("Invalid asn: %s", asn)
|
||||
return
|
||||
prep: typing.Dict[str, DbValue] = {
|
||||
'val': asn_prep,
|
||||
}
|
||||
self._set_generic(
|
||||
'asn',
|
||||
'SELECT entry FROM asn WHERE val=:val',
|
||||
'INSERT INTO asn (val, entry) '
|
||||
'VALUES (:val, :entry)',
|
||||
prep,
|
||||
*args, **kwargs
|
||||
)
|
||||
|
||||
def set_ip4address(self, ip4address: str,
|
||||
*args: typing.Any, **kwargs: typing.Any) -> None:
|
||||
# TODO Do not add if already in ip4network
|
||||
self.enter_step('set_ip4add_prepare')
|
||||
try:
|
||||
ip4address_prep = self.prepare_ip4address(ip4address)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue