@ -31,6 +31,14 @@
from bitstring import BitArray
import copy , csv , code
from collections import namedtuple
#Street = namedtuple("Street", "name lcn")
#Street(lcn=12,name="test")
#Street(name='test', lcn=12)
#Street(lcn=12,name="test").name
language = " de " #currently supported: de, en (both partially)
SUFFIXES = { 1 : ' st ' , 2 : ' nd ' , 3 : ' rd ' }
@ -45,16 +53,29 @@ def ordinal(num):
return str ( num ) + suffix
class lcl :
def __init__ ( self , lcldir ) :
self . points = self . dat_to_dict ( lcldir + ' POINTS.DAT ' , ' ISO 8859-15 ' , ' LCD ' )
self . poffsets = self . dat_to_dict ( lcldir + ' POFFSETS.DAT ' , ' ISO 8859-15 ' , ' LCD ' )
try :
#self.points= self.dat_to_dict(lcldir+'POINTS.DAT','ISO 8859-15','LCD')
#self.poffsets= self.dat_to_dict(lcldir+'POFFSETS.DAT','ISO 8859-15','LCD')
self . names = self . dat_to_dict ( lcldir + ' NAMES.DAT ' , ' ISO 8859-15 ' , ' NID ' )
self . roads = self . dat_to_dict ( lcldir + ' ROADS.DAT ' , ' ISO 8859-15 ' , ' LCD ' )
self . segments = self . dat_to_dict ( lcldir + ' SEGMENTS.DAT ' , ' ISO 8859-15 ' , ' LCD ' )
self . allocated_codes = self . dat_to_dict ( lcldir + ' LOCATIONCODES.DAT ' , ' ISO 8859-15 ' , ' LCD ' )
self . areas = self . dat_to_dict ( lcldir + ' ADMINISTRATIVEAREA.DAT ' , ' ISO 8859-15 ' , ' LCD ' )
#self.roads=self.dat_to_dict(lcldir+'ROADS.DAT','ISO 8859-15','LCD')
#self.segments=self.dat_to_dict(lcldir+'SEGMENTS.DAT','ISO 8859-15','LCD')
#self.allocated_codes=self.dat_to_dict(lcldir+'LOCATIONCODES.DAT','ISO 8859-15','LCD')
#self.areas=self.dat_to_dict(lcldir+'ADMINISTRATIVEAREA.DAT','ISO 8859-15','LCD')
self . allocated_codes = self . dat_to_tuple_dict ( lcldir + ' LOCATIONCODES.DAT ' , ' ISO 8859-15 ' , ' Code ' )
self . points = self . dat_to_tuple_dict ( lcldir + ' POINTS.DAT ' , ' ISO 8859-15 ' , ' Point ' )
self . poffsets = self . dat_to_tuple_dict ( lcldir + ' POFFSETS.DAT ' , ' ISO 8859-15 ' , ' POffset ' )
self . roads = self . dat_to_tuple_dict ( lcldir + ' ROADS.DAT ' , ' ISO 8859-15 ' , ' Road ' )
self . segments = self . dat_to_tuple_dict ( lcldir + ' SEGMENTS.DAT ' , ' ISO 8859-15 ' , ' Segment ' )
self . areas = self . dat_to_tuple_dict ( lcldir + ' ADMINISTRATIVEAREA.DAT ' , ' ISO 8859-15 ' , ' Area ' )
#code.interact(local=locals())
except IOError as e :
print ( e )
print ( " location table not found " )
def lcn_allocated ( self , LCN ) :
return bool ( self . allocated_codes [ LCN ] [ " ALLOCATED " ] )
if self . allocated_codes . has_key ( LCN ) :
return self . allocated_codes [ LCN ] . ALLOCATED == u " 1 "
else :
return False
def get_poffsets ( self , LCD ) :
return self . poffsets [ LCD ]
def get_segment ( self , LCD ) :
@ -67,6 +88,18 @@ class lcl:
return self . points [ LCD ]
def get_name ( self , NID ) :
return self . names [ NID ] [ " NAME " ]
def dat_to_tuple_dict ( self , filename , encoding , tuple_name ) :
csv_reader = csv . reader ( open ( filename ) , delimiter = ' ; ' , quotechar = ' " ' )
header = csv_reader . next ( )
ret_dict = { }
tupleClass = namedtuple ( tuple_name , " " . join ( header ) )
for row in csv_reader :
# decode ISO 8859-15 back to Unicode, cell by cell: #TODO read encoding from README.DAT
unirow = [ unicode ( cell , encoding ) for cell in row ]
linetuple = tupleClass ( * unirow ) # "*" unpacks the list
lcn = int ( linetuple . LCD )
ret_dict [ lcn ] = linetuple
return ret_dict
def dat_to_dict ( self , filename , encoding , id_col_name ) :
csv_reader = csv . reader ( open ( filename ) , delimiter = ' ; ' , quotechar = ' " ' )
header = csv_reader . next ( )
@ -203,18 +236,19 @@ class tmc_area:
self . is_valid = False
if self . lcl_obj . lcn_allocated ( lcn ) :
if self . lcl_obj . areas . has_key ( lcn ) :
loc_dict = self . lcl_obj . areas [ lcn ]
self . ltype = loc_dict [ u ' CLASS ' ] + loc_dict [ u ' TCD ' ]
area = self . lcl_obj . areas [ lcn ]
self . ltype = area . CLASS + area . TCD
try :
self . subtype = int ( loc_dict [ u ' STCD ' ] )
self . subtype = int ( area . STCD )
except ValueError : #should not happen, all rows have int
self . subtype = 0
print ( " location subtype %s is invalid in location %i " % ( loc_dict [ u ' STCD ' ] , lcn ) )
print ( " location subtype %s is invalid in location %i " % ( area . STCD , lcn ) )
loc_dict = self . lcl_obj . areas [ lcn ]
self . roadnumber = loc_dict [ ' ROADNUMBER ' ]
if not loc_dict [ ' NID ' ] == u " " :
self . name = self . lcl_obj . get_name ( int ( loc_dict [ ' NID ' ] ) )
if not area . NID == u " " :
self . name = self . lcl_obj . get_name ( int ( area . NID ) )
self . is_valid = True
elif self . tableobj . log or self . tableobj . debug :
print ( " area not found %i " % lcn )
elif self . tableobj . log or self . tableobj . debug :
print ( " lcn not allocated %i " % lcn )
class tmc_segment :
@ -228,25 +262,28 @@ class tmc_segment:
self . second_name = " "
self . is_valid = False
if self . lcl_obj . lcn_allocated ( lcn ) :
loc_dic t= None
segmen t= None
if self . lcl_obj . segments . has_key ( lcn ) :
loc_dic t= self . lcl_obj . segments [ lcn ]
segmen t= self . lcl_obj . segments [ lcn ]
elif self . lcl_obj . roads . has_key ( lcn ) :
loc_dict = self . lcl_obj . roads [ lcn ]
if not loc_dict == None :
self . ltype = loc_dict [ u ' CLASS ' ] + loc_dict [ u ' TCD ' ]
segment = self . lcl_obj . roads [ lcn ]
elif self . tableobj . log or self . tableobj . debug :
print ( " segment/road not found %i " % lcn )
code . interact ( local = locals ( ) )
if not segment == None :
self . ltype = segment . CLASS + segment . TCD
try :
self . subtype = int ( loc_dict [ u ' STCD ' ] )
self . subtype = int ( segment . STCD )
except ValueError : #should not happen, all rows have int
self . subtype = 0
print ( " location subtype %s is invalid in location %i " % ( loc_dict [ u ' STCD ' ] , lcn ) )
self . roadnumber = loc_dict [ ' ROADNUMBER ' ]
if not loc_dict [ ' RNID ' ] == u " " :
self . roadname = self . lcl_obj . get_name ( int ( loc_dict [ ' RNID ' ] ) )
if not loc_dict [ ' N1ID ' ] == u " " :
self . first_name = self . lcl_obj . get_name ( int ( loc_dict [ ' N1ID ' ] ) )
if not loc_dict [ ' N2ID ' ] == u " " :
self . second_name = self . lcl_obj . get_name ( int ( loc_dict [ ' N2ID ' ] ) )
print ( " location subtype %s is invalid in location %i " % ( segment . STCD , lcn ) )
self . roadnumber = segment . ROADNUMBER
if not segment . RNID == u " " :
self . roadname = self . lcl_obj . get_name ( int ( segment . RNID ) )
if not segment . N1ID == u " " :
self . first_name = self . lcl_obj . get_name ( int ( segment . N1ID ) )
if not segment . N2ID == u " " :
self . second_name = self . lcl_obj . get_name ( int ( segment . N2ID ) )
self . is_valid = True
elif self . tableobj . log or self . tableobj . debug :
print ( " lcn not allocated %i " % lcn )
@ -257,7 +294,7 @@ class tmc_location:
else :
try :
loc_dict = self . lcl_obj . get_area ( lcn )
loc_name = self . lcl_obj . get_name ( int ( self . loc_dict [ ' N1ID ' ] ) )
loc_name = self . lcl_obj . get_name ( int ( self . point . N1ID ) )
aref = int ( loc_dict [ u ' POL_LCD ' ] )
return ( self . __ref_locs ( aref , name_string + " , " + loc_name ) )
except KeyError : #no area with lcn
@ -275,38 +312,40 @@ class tmc_location:
self . loc_dict = { }
self . has_koord = False
self . linRef = None
if self . lcl_obj . lcn_allocated ( lcn ) :
if self . lcl_obj . lcn_allocated ( lcn ) and self . lcl_obj . points . has_key ( lcn ) :
try :
self . loc_dic t= self . lcl_obj . get_point ( lcn )
self . poin t= self . lcl_obj . get_point ( lcn )
self . reflocs = self . __ref_locs ( lcn )
self . ltype = self . loc_dict [ u ' CLASS ' ] + self . loc_dict [ u ' TCD ' ]
self . ltype = self . point . CLASS + self . point . TCD
try :
self . subtype = int ( self . loc_dict [ u ' STCD ' ] )
self . subtype = int ( self . point . STCD )
except ValueError : #should not happen, all rows have int
self . subtype = 0
print ( " location subtype %s is invalid in location %i " % ( self . loc_dict [ u ' STCD ' ] , lcn ) )
if not self . loc_dict [ ' RNID ' ] == u " " :
self . roadname = self . lcl_obj . get_name ( int ( self . loc_dict [ ' RNID ' ] ) )
print ( " location subtype %s is invalid in location %i " % ( self . point . STCD , lcn ) )
if not self . point . RNID == u " " :
self . roadname = self . lcl_obj . get_name ( int ( self . point . RNID ) )
else :
self . roadname = " "
if not self . loc_dict [ ' N1ID ' ] == u " " :
self . first_name = self . lcl_obj . get_name ( int ( self . loc_dict [ ' N1ID ' ] ) )
if not self . point . N1ID == u " " :
self . first_name = self . lcl_obj . get_name ( int ( self . point . N1ID ) )
else :
self . first_name = " "
if not self . loc_dict [ ' N2ID ' ] == u " " :
self . second_name = self . lcl_obj . get_name ( int ( self . loc_dict [ ' N2ID ' ] ) )
if not self . point . N2ID == u " " :
self . second_name = self . lcl_obj . get_name ( int ( self . point . N2ID ) )
else :
self . second_name = " "
if not self . loc_dict [ ' ROA_LCD ' ] == u " " :
self . linRef = tmc_segment ( int ( self . loc_dict [ ' ROA_LCD ' ] ) , tableobj )
self . negative_offset = self . lcl_obj . get_poffsets ( lcn ) [ u " NEG_OFF_LCD " ]
self . positive_offset = self . lcl_obj . get_poffsets ( lcn ) [ u " POS_OFF_LCD " ]
if not self . point . SEG_LCD == u " " :
self . linRef = tmc_segment ( int ( self . point . SEG_LCD ) , tableobj )
elif not self . point . ROA_LCD == u " " :
self . linRef = tmc_segment ( int ( self . point . ROA_LCD ) , tableobj )
self . negative_offset = self . lcl_obj . get_poffsets ( lcn ) . NEG_OFF_LCD
self . positive_offset = self . lcl_obj . get_poffsets ( lcn ) . POS_OFF_LCD
try :
#koords stored in WGS84 format with decimal degrees multiplied with 10^5
self . xkoord = int ( self . loc_dict [ u " XCOORD " ] ) / 100000.0
self . ykoord = int ( self . loc_dict [ u " YCOORD " ] ) / 100000.0
self . xkoord = int ( self . point . XCOORD ) / 100000.0
self . ykoord = int ( self . point . YCOORD ) / 100000.0
self . koord_str = " %f , %f " % ( self . ykoord , self . xkoord )
self . koord_str_google = " { lat: %f , lng: %f } " % ( self . ykoord , self . xkoord )
self . google_maps_link = " https://www.google.de/maps/place/ %f , %f " % ( self . ykoord , self . xkoord )
@ -314,12 +353,13 @@ class tmc_location:
except ValueError :
self . has_koord = False
self . is_valid = True
if not self . loc_dict [ ' POL_LCD ' ] == " " : #Europe (lcn==34196) does not have an area reference
self . aref = tmc_area ( int ( self . loc_dict [ ' POL_LCD ' ] ) , tableobj )
except KeyError :
print ( " point ' %i ' not found " % lcn )
if not self . point . POL_LCD == " " : #Europe (lcn==34196) does not have an area reference
self . aref = tmc_area ( int ( self . point . POL_LCD ) , tableobj )
except KeyError as e :
print ( e )
print ( " error making point ' %i ' " % lcn )
elif self . tableobj . log or self . tableobj . debug :
print ( " lcn not allocated %i " % lcn )
print ( " lcn not allocated or not point %i " % lcn )
def get_extent_location ( self , loc , extent , direction ) : #direction: 0:pos, 1:neg
if extent == 0 or not loc . is_valid :
return loc
@ -350,16 +390,18 @@ class tmc_location:
def __repr__ ( self ) :
if not self . is_valid :
return " invalid lcn: %i " % ( self . lcn )
#elif self.ltype[0:2] == "P1": #junction
elif self . first_name == " " : #no first name-> use aref name
name = self . aref
else :
name = self . roadname + " , " + self . first_name
if self . has_koord :
return " %s , %i : %s , geo: %s " % ( self . ltype , self . subtype , name , self . koord_str )
#return '%s,%i:%s, geo:<a href="%s">%s</a>'%(self.ltype,self.subtype,name,self.google_maps_link,self.koord_str)
else :
return " %s , %i : %s " % ( self . ltype , self . subtype , name )
return unicode ( self ) . encode ( ' utf-8 ' ) + " lcn: %i " % ( self . lcn )
##elif self.ltype[0:2] == "P1": #junction
#elif self.first_name=="":#no first name-> use aref name
#name=self.aref
#else:
#name=self.roadname+","+self.first_name
#if self.has_koord:
#return "%s,%i:%s, geo:%s"%(self.ltype,self.subtype,name,self.koord_str)
##return '%s,%i:%s, geo:<a href="%s">%s</a>'%(self.ltype,self.subtype,name,self.google_maps_link,self.koord_str)
#else:
#return "%s,%i:%s"%(self.ltype,self.subtype,name)
#~ class tmc_location:
#~ def __init__(self,lcn,tableobj):
#~ self.tableobj=tableobj
@ -716,6 +758,8 @@ class tmc_message:
#EventCode: EventText
#F
#return unicode(text,encoding="utf-8")
elif self . location . is_valid :
print ( " linRef invalid on LCN %i " % self . location . lcn )
return text . encode ( ' utf-8 ' )
def __str__ ( self ) :
return str ( self . event . updateClass ) + " : " + self . getTime ( ) + " : " + self . events_string ( ) + " ; " + self . multi_str ( )