5555
5656log = logging .getLogger (__name__ )
5757
58+ BUFR_PARAMETER_MAPPING = {
59+ DwdRadarParameter .PE_ECHO_TOP : ["echoTops" ],
60+ DwdRadarParameter .PG_REFLECTIVITY : ["horizontalReflectivity" ],
61+ DwdRadarParameter .LMAX_VOLUME_SCAN : ["horizontalReflectivity" ],
62+ DwdRadarParameter .PX250_REFLECTIVITY : ["horizontalReflectivity" ],
63+ }
64+
65+ ECCODES_FOUND = ensure_eccodes ()
66+
5867
5968@dataclass
6069class RadarResult :
@@ -64,6 +73,8 @@ class RadarResult:
6473 """
6574
6675 data : BytesIO
76+ # placeholder for bufr files, which are read into pandas.DataFrame if eccodes available
77+ df : pl .DataFrame = field (default_factory = pl .DataFrame )
6778 timestamp : dt .datetime = None
6879 url : str = None
6980 filename : str = None
@@ -415,6 +426,69 @@ def query(self) -> Iterator[RadarResult]:
415426 verify_hdf5 (result .data )
416427 except Exception as e : # pragma: no cover
417428 log .exception (f"Unable to read HDF5 file. { e } " )
429+
430+ if self .format == DwdRadarDataFormat .BUFR :
431+ if ECCODES_FOUND and self .settings .read_bufr :
432+ buffer = result .data
433+
434+ # TODO: pdbufr currently doesn't seem to allow reading directly from BytesIO
435+ tf = tempfile .NamedTemporaryFile ("w+b" )
436+ tf .write (buffer .read ())
437+ tf .seek (0 )
438+
439+ df = pdbufr .read_bufr (
440+ tf .name ,
441+ columns = "data" ,
442+ flat = True
443+ )
444+
445+ value_vars = []
446+ parameters = BUFR_PARAMETER_MAPPING [self .parameter ]
447+ for par in parameters :
448+ value_vars .extend ([col for col in df .columns if par in col ])
449+ value_vars = set (value_vars )
450+ id_vars = df .columns .difference (value_vars )
451+ id_vars = [iv for iv in id_vars if iv .startswith ("#1#" )]
452+
453+ df = df .melt (id_vars = id_vars ,var_name = "parameter" ,value_vars = value_vars , value_name = "value" )
454+ df .columns = [col [3 :] if col .startswith ("#1#" ) else col for col in df .columns ]
455+
456+ df = df .rename (
457+ columns = {
458+ "stationNumber" : Columns .STATION_ID .value ,
459+ "latitude" : Columns .LATITUDE .value ,
460+ "longitude" : Columns .LONGITUDE .value ,
461+ "heightOfStation" : Columns .HEIGHT .value ,
462+ }
463+ )
464+
465+
466+ # df[Columns.STATION_ID.value] = df[Columns.STATION_ID.value].astype(int).astype(str)
467+
468+ date_columns = ["year" , "month" , "day" , "hour" , "minute" ]
469+ dates = df .loc [:, date_columns ].apply (
470+ lambda x : datetime (
471+ year = x .year , month = x .month , day = x .day , hour = x .hour , minute = x .minute
472+ ),
473+ axis = 1 ,
474+ )
475+ df .insert (len (df .columns ) - 1 , Columns .DATE .value , dates )
476+ df = df .drop (columns = date_columns )
477+
478+ def split_index_parameter (text : str ):
479+ split_index = text .index ("#" , 1 )
480+ if split_index == - 1 :
481+ return text , None
482+ index = text [1 :split_index ]
483+ parameter = text [split_index + 1 :]
484+ return parameter , float (index )
485+
486+ df [["parameter" , "index" ]] = df .pop ("parameter" ).map (split_index_parameter ).tolist ()
487+
488+ df = df .sort_values (["parameter" , "index" ])
489+
490+ result .df = df
491+
418492 yield result
419493
420494 @staticmethod
0 commit comments