#!/usr/bin/env python # -*- coding: utf-8 -*- # # Copyright 2017 <+YOU OR YOUR COMPANY+>. # # This is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 3, or (at your option) # any later version. # # This software is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this software; see the file COPYING. If not, write to # the Free Software Foundation, Inc., 51 Franklin Street, # Boston, MA 02110-1301, USA. # #references to tableobj: #~ label6_suppl_info #~ ecl_dict #~ tmc_update_class_names #~ lcl_dict #~ debug #~ tmc_messages #rename to common.py? from bitstring import BitArray import copy,csv,code language="de"#currently supported: de, en (both partially) SUFFIXES = {1: 'st', 2: 'nd', 3: 'rd'} def ordinal(num): # I'm checking for 10-20 because those are the digits that # don't follow the normal counting scheme. if 10 <= num % 100 <= 20: suffix = 'th' else: # the second parameter is a default. suffix = SUFFIXES.get(num % 10, 'th') return str(num) + suffix class lcl: def __init__(self,lcldir): self.points= self.dat_to_dict(lcldir+'POINTS.DAT','ISO 8859-15','LCD') self.poffsets= self.dat_to_dict(lcldir+'POFFSETS.DAT','ISO 8859-15','LCD') self.names= self.dat_to_dict(lcldir+'NAMES.DAT','ISO 8859-15','NID') self.roads=self.dat_to_dict(lcldir+'ROADS.DAT','ISO 8859-15','LCD') self.segments=self.dat_to_dict(lcldir+'SEGMENTS.DAT','ISO 8859-15','LCD') self.allocated_codes=self.dat_to_dict(lcldir+'LOCATIONCODES.DAT','ISO 8859-15','LCD') self.areas=self.dat_to_dict(lcldir+'ADMINISTRATIVEAREA.DAT','ISO 8859-15','LCD') def lcn_allocated(self,LCN): return bool(self.allocated_codes[LCN]["ALLOCATED"]) def get_poffsets(self,LCD): return self.poffsets[LCD] def get_segment(self,LCD): return self.segments[LCD] def get_road(self,LCD): return self.roads[LCD] def get_area(self,LCD): return self.areas[LCD] def get_point(self,LCD): return self.points[LCD] def get_name(self,NID): return self.names[NID]["NAME"] def dat_to_dict(self,filename,encoding,id_col_name): csv_reader = csv.reader(open(filename), delimiter=';', quotechar='"') header=csv_reader.next() ret_dict={} for row in csv_reader: # decode ISO 8859-15 back to Unicode, cell by cell: #TODO read encoding from README.DAT unirow=[unicode(cell, encoding) for cell in row] linedict=dict(zip(header,unirow)) id_num=int(linedict[id_col_name]) ret_dict[id_num]=linedict return ret_dict class tmc_event: def __init__(self,ecn,tableobj): self.tableobj=tableobj self.ecn=ecn self.text_raw="##Error##" self.name="##Error##" self.length_str=None self.speed_limit_str=None self.is_cancellation = False self.updateClass=999#invalid self.is_valid=False try: #Code,Text CEN-English,Text (German),Text (German) kein Quantifier,Text (Quantifier = 1),Text (Quantifier >1),N,Q,T,D,U,C,R ,Comment event_array=self.tableobj.ecl_dict[ecn] self.text_noQ=event_array[2] if language=="de": self.text_raw=event_array[1] self.name=self.text_noQ else: self.text_raw=event_array[0]#CEN-english self.name=re.sub("\(([^()]+)\)","",self.text_raw)#removes everything between parentheses (assume no quantifier is used) self.text_singleQ=event_array[3] self.text_pluralQ=event_array[4] self.nature=event_array[5]#N:nature (blank): information, F:forecast, S:silent if event_array[0]=="message cancelled": self.is_cancellation = True self.quantifierType=event_array[6]#Q:quantifier type: (0..12) or blank (no quantifier) self.durationType=event_array[7]#T:duration type: D:dynamic, L:long lasting, in brackets or if time-of-day quantifier (no 7) is used in message -> no display, only for management directionality=event_array[8]#D:directionality: 1:unidirectional, 2:bidirectional, cancellation messages dont have directionality self.is_unidirectional=True if directionality=="1" else False self.is_bidirectional=True if directionality=="2" else False self.urgency=event_array[9]#U:urgency: blank: normal, X:extremely urgent, U:urgent self.updateClass=int(event_array[10])#C: update class: self.updateClassName=self.tableobj.tmc_update_class_names[self.updateClass] self.phrase_code=event_array[11]#R: phrase code #04789 #if not self.quantifierType=="" and not self.quantifierType=="0" and not self.quantifierType=="4": #print("type: %s, C:%s"%(self.quantifierType,self.updateClassName)) self.is_valid=True except KeyError: print("event '%i' not found"%ecn) self.is_valid=False def change_directionality(self): self.is_unidirectional=not self.is_unidirectional self.is_bidirectional=not self.is_bidirectional def add_length(self,data):#from label2 self.length_str="%i km"%mgm_tag.length_to_km(data.uint) #self.name=self.name.replace("(L)",self.length_str) def add_speed_limit(self,data):#from label3 self.speed_limit_str="%i km/h"%(data.uint*5) def add_quantifier(self,data,bitLength):#from label 4 self.name=self.text_raw#default Q_raw=data.uint if Q_raw==0:#binary zero represents highest value Q=32 else: Q=Q_raw quantifier_string="type:%s,raw:%i"%(self.quantifierType,Q) #print(str(self.ecn)+", "+quantifier_string+", "+str(bitLength)+", "+str(data)+", "+self.text_raw) if self.quantifierType=="": print("cannot add quantifier %i to event ecn:%i"%(Q_raw,self.ecn)) elif self.quantifierType=="0":#small numbers if(Q <= 28): quantifier_string=str(Q) else: quantifier_string=str(30+(Q-29)*2)#30,32,34,36 #print(quantifier_string) self.name=self.text_pluralQ.replace("(Q)",quantifier_string) elif self.quantifierType=="1":#numbers numbers=[1,2,3,4,10,20,30,40,50,60,70,80,90,100,150,200,250,300,350,400,450,500,550,600,650,700,750,800,850,900,950,1000] quantifier_string=str(numbers[Q-1]) elif self.quantifierType=="2":#z.b. für sichtweiten, e.g. for visibility quantifier_string="%i m"%(Q*10)#TODO meter oder metern? self.name=self.text_pluralQ.replace("(Q)",quantifier_string) #quantifier_string="%i Metern"%Q*10 #self.name=self.text_pluralQ.replace("Q)",quantifier_string+")") elif self.quantifierType=="4": speed=Q*5#in kmh #quantifier_string="von bis zu %i km/h"%speed quantifier_string="%i km/h"%speed elif self.quantifierType=="7": hours=int((Q-1)/6) minutes=((Q-1)%6)*10 quantifier_string="%i:%i"%(hours,minutes) #print(quantifier_string) elif self.quantifierType=="8": if Q<=100: weight=Q*0.1 else: weight=10+0.5*(Q-100) quantifier_string="%it"%weight self.name=self.text_pluralQ.replace("(Q)",quantifier_string) elif self.quantifierType=="9": if Q<=100: length=Q*0.1 else: length=10+0.5*(Q-100) quantifier_string="%.1fm"%length self.name=self.text_pluralQ.replace("(Q)",quantifier_string) else:#other quantifier self.name=self.text_raw+"; Q(%s)=%s"%(self.quantifierType,quantifier_string) def __str__(self): if self.is_valid: retstr=self.name if not self.length_str == None: retstr=self.name.replace("(L)",self.length_str) if not self.speed_limit_str == None: if language=="de": retstr+=" (geschw. begrenzt: %s)"%self.speed_limit_str else: retstr+=" (speed limit: %s)"%self.speed_limit_str return retstr else: return("invalid event, ecn:%i"%self.ecn) def __repr__(self): return "ecn:%i"%self.ecn class tmc_location: def __ref_locs(self,lcn,name_string=""): if(lcn==34196):#europe return(name_string) else: try: loc_dict=self.lcl_obj.get_area(lcn) aref=int(loc_dict[u'POL_LCD']) loc_name=self.lcl_obj.get_name(int(self.loc_dict['N1ID'])) return(self.__ref_locs(aref,name_string+","+loc_name)) except KeyError: return(name_string) def __getitem__(self,item): return self.loc_dict[item] def __init__(self,lcn,lcl_obj): self.lcn=lcn self.lcl_obj=lcl_obj self.reflocs=self.__ref_locs(lcn) self.is_valid=False self.loc_dict={} self.has_koord=False self.linRef=None if self.lcl_obj.lcn_allocated(lcn): try: self.loc_dict=self.lcl_obj.get_point(lcn) self.is_valid=True self.ltype=self.loc_dict[u'CLASS']+self.loc_dict[u'TCD'] try: self.subtype=int(self.loc_dict[u'STCD']) except ValueError:#should not happen, all rows have int self.subtype=0 print("location subtype %s is invalid in location %i"%(self.loc_dict[u'STCD'],lcn)) self.roadnumber="" self.roadname=self.lcl_obj.get_name(int(self.loc_dict['RNID'])) self.first_name=self.lcl_obj.get_name(int(self.loc_dict['N1ID'])) self.second_name=self.lcl_obj.get_name(int(self.loc_dict['N2ID'])) if not self.loc_dict['ROA_LCD']==u"": self.linRef=tmc_location(int(self.loc_dict['ROA_LCD']),tableobj) self.negative_offset=int(self.lcl_obj.get_poffsets(lcd)[u"NEG_OFF_LCD"]) self.positive_offset=int(self.lcl_obj.get_poffsets(lcd)[u"POS_OFF_LCD"]) try: #koords stored in WGS84 format with decimal degrees multiplied with 10^5 self.xkoord=int(self.loc_dict[u"XCOORD"])/100000.0 self.ykoord=int(self.loc_dict[u"YCOORD"])/100000.0 self.koord_str="%f,%f"%(self.ykoord,self.xkoord) self.koord_str_google="{lat: %f, lng: %f}"%(self.ykoord,self.xkoord) self.google_maps_link="https://www.google.de/maps/place/%f,%f"%(self.ykoord,self.xkoord) self.has_koord=True except ValueError: self.has_koord=False self.is_valid=True if not lcn==34196:#Europe does not have an area reference self.aref=tmc_location(int(self.loc_dict['POL_LCD']),tableobj) except KeyError: print("point '%i' not found"%lcn) elif self.tableobj.log or self.tableobj.debug: print("lcn not allocated %i"%lcn) def get_extent_location(self,loc,extent,direction): #direction: 0:pos, 1:neg if extent==0 or not loc.is_valid: return loc else: offset=loc.positive_offset if direction==0 else loc.negative_offset if offset=="": return loc else: offset_loc=tmc_location(int(offset),self.tableobj) #return __recursion_get_extent_location(offset_loc,extent-1,direction) return offset_loc.get_extent_location(offset_loc,extent-1,direction) def __str__(self): if not self.is_valid: return "invalid lcn:%i"%(self.lcn) elif self.ltype=="P1" and self.subtype==1:#autobahnkreuz TODO:only add if name does not already contain "Kreuz" name="Kreuz "+self.first_name elif self.ltype=="P1" and self.subtype==2:#autobahndreieck TODO:only add if name does not already contain "Dreieck" name="Dreieck "+self.first_name elif not self.roadname=="": name="STR:"+self.roadname#TODO remove debug elif not self.first_name=="": name=self.first_name else: name="NO NAME lcn:%i"%(self.lcn) return "%s,%i:%s"%(self.ltype,self.subtype,name)#no geo def __repr__(self): if not self.is_valid: return "invalid lcn:%i"%(self.lcn) #elif self.ltype[0:2] == "P1": #junction elif self.first_name=="":#no first name-> use aref name name=self.aref else: name=self.roadname+","+self.first_name if self.has_koord: return "%s,%i:%s, geo:%s"%(self.ltype,self.subtype,name,self.koord_str) #return '%s,%i:%s, geo:%s'%(self.ltype,self.subtype,name,self.google_maps_link,self.koord_str) else: return "%s,%i:%s"%(self.ltype,self.subtype,name) #~ class tmc_location: #~ def __init__(self,lcn,tableobj): #~ self.tableobj=tableobj #~ self.lcl_obj=tableobj.lcl_obj #~ self.reflocs=self.__ref_locs(lcn) #~ self.lcn=lcn #~ self.has_koord=False #~ self.linRef=None #~ self.is_valid=False #~ if self.lcl_obj.lcn_allocated(lcn): #~ pass #~ elif self.tableobj.log or self.tableobj.debug: #~ print("lcn not allocated %i"%lcn) #~ try: #~ loc_array=tableobj.lcl_dict[lcn] #~ self.ltype=loc_array[0] #~ try: #~ self.subtype=int(loc_array[1]) #~ except ValueError:#should not happen, all rows have int #~ self.subtype=0 #~ print("location subtype %s is invalid in location %i"%(loc_array[1],lcn)) #~ self.roadnumber=loc_array[2] #~ self.roadname=loc_array[3] #~ self.first_name=loc_array[4] #~ self.second_name=loc_array[5] #~ if not loc_array[7]=="": #~ self.linRef=tmc_location(int(loc_array[7]),tableobj) #~ self.negative_offset=loc_array[8] #~ self.positive_offset=loc_array[9] #~ try: #~ #koords stored in WGS84 format with decimal degrees multiplied with 10^5 #~ self.xkoord=int(loc_array[27])/100000.0 #~ self.ykoord=int(loc_array[28])/100000.0 #~ self.koord_str="%f,%f"%(self.ykoord,self.xkoord) #~ self.koord_str_google="{lat: %f, lng: %f}"%(self.ykoord,self.xkoord) #~ self.google_maps_link="https://www.google.de/maps/place/%f,%f"%(self.ykoord,self.xkoord) #~ self.has_koord=True #~ except ValueError: #~ self.has_koord=False #~ self.is_valid=True #~ if not lcn==34196:#Europe does not have an area reference #~ self.aref=tmc_location(int(loc_array[6]),tableobj) #~ except KeyError: #~ #print("location '%i' not found"%lcn) #~ self.is_valid=False #~ ##LOCATIONCODE;TYPE;SUBTYPE;ROADNUMBER;ROADNAME;FIRST_NAME;SECOND_NAME;AREA_REFERENCE;LINEAR_REFERENCE;NEGATIVE_OFFSET;POSITIVE_OFFSET;URBAN;INTERSECTIONCODE;INTERRUPTS_ROAD;IN_POSITIVE;OUT_POSITIVE;IN_NEGATIVE;OUT_NEGATIVE;PRESENT_POSITIVE;PRESENT_NEGATIVE;EXIT_NUMBER;DIVERSION_POSITIVE;DIVERSION_NEGATIVE;VERÄNDERT;TERN;NETZKNOTEN_NR;NETZKNOTEN2_NR;STATION;X_KOORD;Y_KOORD;POLDIR;ADMIN_County;ACTUALITY;ACTIVATED;TESTED;SPECIAL1;SPECIAL2;SPECIAL3;SPECIAL4;SPECIAL5;SPECIAL6;SPECIAL7;SPECIAL8;SPECIAL9;SPECIAL10 #~ #def get_extent_location(self,extent,direction): #~ #__recursion_get_extent_location(self,extent,direction) #~ #def __recursion_get_extent_location(self,loc,extent,direction): #direction: 0:pos, 1:neg #~ def get_extent_location(self,loc,extent,direction): #direction: 0:pos, 1:neg #~ #print(loc.lcn) #~ #print(extent) #~ #print(direction) #~ if extent==0 or not loc.is_valid: #~ return loc #~ else: #~ offset=loc.positive_offset if direction==0 else loc.negative_offset #~ if offset=="": #~ return loc #~ else: #~ offset_loc=tmc_location(int(offset),self.tableobj) #~ #return __recursion_get_extent_location(offset_loc,extent-1,direction) #~ return offset_loc.get_extent_location(offset_loc,extent-1,direction) #~ def __ref_locs(self,lcn,name_string=""): #~ #if not self.is_valid: #not used, since not called from outside #~ # return "" #~ if(lcn==34196):#europe #~ return(name_string) #~ else: #~ try: #~ locarray=self.tableobj.lcl_dict[lcn] #~ aref=int(locarray[6]) #~ loc_name=locarray[4] #~ return(self.__ref_locs(aref,name_string+","+loc_name)) #~ #return(loc_name) #~ except KeyError: #~ return(name_string) #~ def __str__(self): #~ if not self.is_valid: #~ return "invalid lcn:%i"%(self.lcn) #~ elif self.ltype=="P1" and self.subtype==1:#autobahnkreuz TODO:only add if name does not already contain "Kreuz" #~ name="Kreuz "+self.first_name #~ elif self.ltype=="P1" and self.subtype==2:#autobahndreieck TODO:only add if name does not already contain "Dreieck" #~ name="Dreieck "+self.first_name #~ elif not self.roadname=="": #~ name="STR:"+self.roadname#TODO remove debug #~ elif not self.first_name=="": #~ name=self.first_name #~ else: #~ name="NO NAME lcn:%i"%(self.lcn) #~ return "%s,%i:%s"%(self.ltype,self.subtype,name)#no geo #~ def __repr__(self): #~ if not self.is_valid: #~ return "invalid lcn:%i"%(self.lcn) #~ #elif self.ltype[0:2] == "P1": #junction #~ elif self.first_name=="":#no first name-> use aref name #~ name=self.aref #~ else: #~ name=self.roadname+","+self.first_name #~ if self.has_koord: #~ return "%s,%i:%s, geo:%s"%(self.ltype,self.subtype,name,self.koord_str) #~ #return '%s,%i:%s, geo:%s'%(self.ltype,self.subtype,name,self.google_maps_link,self.koord_str) #~ else: #~ return "%s,%i:%s"%(self.ltype,self.subtype,name) #~ #if self.ltype[0]=="A":#area #~ #return "%s:%s"%(self.ltype,self.first_name) #~ #elif self.ltype[0]=="L":#line #~ #return "%s:%s"%(self.ltype,self.first_name) #~ #elif self.ltype[0]=="P":#point #~ #return "%s:%s"%(self.ltype,self.first_name) class tmc_dict: "dict of tmc messages sorted by location (LCN) and update class, automatically deletes/updates invalid(ated) items" marker_template="addMarker({loc},'{text}',{endloc})" def __init__(self): self.messages={} self.message_list=[] def add(self,message): self.message_list.append(message) try: lcn=message.location.lcn updateClass=message.event.updateClass if not self.messages.has_key(lcn): self.messages[lcn]={} if message.event.is_cancellation: try: self.messages[lcn][updateClass].cancellation_time=message.getTime()#cancellation_time = rx time of cancellation message except KeyError:#no message to cancel message.event.name="no message to cancel" self.messages[lcn][updateClass]=message else: if self.messages[lcn].has_key(updateClass) and self.messages[lcn][updateClass].tmc_hash ==message.tmc_hash:#if same message -> add confirmation self.messages[lcn][updateClass].add_confirmation(message) else:#(over)write message self.messages[lcn][updateClass]=message #print("added message: "+str(message)) except AttributeError: print("ERROR, not adding: "+str(message)) def matchFilter(self,msg,filters): if not msg.location.is_valid: return True#always show invalid messages loc_str=str(msg.location)+str(msg.location.reflocs)+str(msg.location.roadnumber) for f in filters:#filters is list of dicts {"type":"event","str":"Stau"} stringlist=f["str"].lower().split(";") for string in stringlist: if f["type"]=="event" and unicode(str(msg.event), encoding="UTF-8").lower().find(string)==-1:#if event filter does not match return False elif f["type"]=="location" and unicode(loc_str, encoding="UTF-8").lower().find(string)==-1:#if location filter does not match return False return True def getLogString(self,filters): retStr="" for message in self.message_list: if self.matchFilter(message,filters): retStr+=message.log_string() retStr+="\n" retStr+=message.multi_str() retStr+="\n" return retStr def getMarkerString(self): markerstring="" try: for lcn in self.messages: loc=None endloc=None map_tag='
' for updateClass in self.messages[lcn]: message=self.messages[lcn][updateClass] if message.cancellation_time==None: color="black" else: color="gray" if message.location.has_koord: if loc==None:#first message at this location map_tag+='