@ -8,21 +8,28 @@ import typing
import multiprocessing
import enum
Record = typing . Tuple [ typing . Callable , typing . Callable , int , str , str ]
Record = typing . Tuple [ typing . Callable ,
typing . Callable , int , database . Path , database . Path ]
# select, write
# select, write, name_packer, value_packer
FUNCTION_MAP : typing . Any = {
' a ' : (
database . Database . get_ip4 ,
database . Database . set_hostname ,
database . Database . pack_domain ,
database . Database . pack_ip4address ,
) ,
' cname ' : (
database . Database . get_domain ,
database . Database . set_hostname ,
database . Database . pack_domain ,
database . Database . pack_domain ,
) ,
' ptr ' : (
database . Database . get_domain ,
database . Database . set_ip4address ,
database . Database . pack_ip4address ,
database . Database . pack_domain ,
) ,
}
@ -49,11 +56,8 @@ class Writer(multiprocessing.Process):
select , write , updated , name , value = record
self . db . enter_step ( ' feed_switch ' )
try :
for source in select ( self . db , value ) :
write ( self . db , name , updated , source = source )
except ValueError :
self . log . exception ( " Cannot execute: %s " , record )
for source in select ( self . db , value ) :
write ( self . db , name , updated , source = source )
self . db . enter_step ( ' block_wait ' )
@ -76,8 +80,33 @@ class Parser():
self . prof = database . Profiler ( )
self . prof . log = logging . getLogger ( ' pr ' )
def register ( self , record : Record ) - > None :
self . prof . enter_step ( ' register ' )
def register ( self ,
rtype : str ,
timestamp : int ,
name_str : str ,
value_str : str ,
) - > None :
self . prof . enter_step ( ' pack ' )
try :
select , write , name_packer , value_packer = FUNCTION_MAP [ rtype ]
except KeyError :
self . log . exception ( " Unknown record type " )
return
try :
name = name_packer ( name_str )
except ValueError :
self . log . exception ( " Cannot parse name ( ' %s ' with %s ) " ,
name_str , name_packer )
return
try :
value = value_packer ( value_str )
except ValueError :
self . log . exception ( " Cannot parse value ( ' %s ' with %s ) " ,
value_str , value_packer )
return
record = ( select , write , timestamp , name , value )
self . prof . enter_step ( ' grow_block ' )
self . block . append ( record )
if len ( self . block ) > = self . block_size :
self . prof . enter_step ( ' put_block ' )
@ -96,6 +125,7 @@ class Parser():
class Rapid7Parser ( Parser ) :
def consume ( self ) - > None :
data = dict ( )
self . prof . enter_step ( ' iowait ' )
for line in self . buf :
self . prof . enter_step ( ' parse_rapid7 ' )
split = line . split ( ' " ' )
@ -106,26 +136,25 @@ class Rapid7Parser(Parser):
val = split [ k + 2 ]
data [ key ] = val
select , writer = FUNCTION_MAP [ data [ ' type ' ] ]
record = (
select ,
writer ,
self . register (
data [ ' type ' ] ,
int ( data [ ' timestamp ' ] ) ,
data [ ' name ' ] ,
data [ ' value ' ]
data [ ' value ' ] ,
)
except IndexError :
self . prof . enter_step ( ' iowait ' )
except KeyError :
# Sometimes JSON records are off the place
self . log . exception ( " Cannot parse: %s " , line )
self . register ( record )
class DnsMassParser ( Parser ) :
# dnsmass --output Snrql
# --retry REFUSED,SERVFAIL --resolvers nameservers-ipv4
TYPES = {
' A ' : ( FUNCTION_MAP [ ' a ' ] [ 0 ] , FUNCTION_MAP [ ' a ' ] [ 1 ] , - 1 , None ) ,
# 'AAAA': (FUNCTION_MAP[ 'aaaa'][0], FUNCTION_MAP['aaaa'][1] , -1, None),
' CNAME ' : ( FUNCTION_MAP [ ' cname ' ] [ 0 ] , FUNCTION_MAP [ ' cname ' ] [ 1 ] , - 1 , - 1 ) ,
' A ' : ( ' a ' , - 1 , None ) ,
# 'AAAA': ('aaaa', -1, None),
' CNAME ' : ( ' cname ' , - 1 , - 1 ) ,
}
def consume ( self ) - > None :
@ -144,19 +173,19 @@ class DnsMassParser(Parser):
timestamp = int ( split [ 1 ] )
header = False
else :
select , w ri te, name_offset , value_offset = \
rtyp e , name_offset , value_offset = \
DnsMassParser . TYPES [ split [ 1 ] ]
record = (
select ,
write ,
self . register (
rtype ,
timestamp ,
split [ 0 ] [ : name_offset ] ,
split [ 2 ] [ : value_offset ] ,
)
self . register ( record )
self . prof . enter_step ( ' parse_dnsmass ' )
except KeyError :
continue
# Malformed records are less likely to happen,
# but we may never be sure
self . log . exception ( " Cannot parse: %s " , line )
PARSERS = {
@ -189,7 +218,7 @@ if __name__ == '__main__':
args = args_parser . parse_args ( )
recs_queue : multiprocessing . Queue = multiprocessing . Queue (
maxsize = args . queue_size )
maxsize = args . queue_size )
writer = Writer ( recs_queue )
writer . start ( )