Skip to content

dp3.scripts.datapoint_log_converter

Converts legacy CSV DataPoint log format to JSON

LegacyDataPointLoader

LegacyDataPointLoader(attr_config_dirname: str)

Loader of datapoint files as written by DP3 API receiver.

Create a datapoint loader.

attr_config_dirname: Directory with attribute configuration (same as for DP3)

Source code in dp3/scripts/datapoint_log_converter.py
def __init__(self, attr_config_dirname: str):
    """
    Create a datapoint loader.

    attr_config_dirname: Directory with attribute configuration (same as for DP3)
    """
    # Load attribute config
    model_spec = ModelSpec(read_config_dir(attr_config_dirname))

    # Prepare a table for data type conversion
    # (to get data type from model_spec: model_spec[etype]["attribs"][attrname].data_type)
    self.dt_conv = {}  # map (etype,attr_name) -> conversion_function
    for etype, spec in model_spec.items():
        for aname, aspec in spec["attribs"].items():
            data_type = getattr(aspec, "data_type", None)
            converter = json.loads if data_type is None else get_converter(str(data_type))
            self.dt_conv[(etype, aname)] = converter

    self.model_spec = model_spec

read_dp_file

read_dp_file(filename: str) -> pd.DataFrame

Read a file with ADiCT/DP3 datapoints into pandas DataFrame.

Values of attributes in datapoints are validated and converted according to the attribute configuration passed to LegacyDataPointLoader constructor.

Source code in dp3/scripts/datapoint_log_converter.py
def read_dp_file(self, filename: str) -> pd.DataFrame:
    """
    Read a file with ADiCT/DP3 datapoints into pandas DataFrame.

    Values of attributes in datapoints are validated and converted according
    to the attribute configuration passed to LegacyDataPointLoader constructor.
    """
    open_function = gzip.open if filename.endswith(".gz") else open

    # Reformat datapoints file so "val" containing commas can be read properly.
    #   Replace first 7 commas (i.e. all except those inside "v") with semicolon
    #   Store as temporary file
    tmp_name = (
        f"tmp-{'.'.join(os.path.basename(os.path.normpath(filename)).split(sep='.')[:-1])}"
    )
    with open_function(filename, "rb") as infile, open(tmp_name, "wb") as outfile:
        for line in infile:
            outfile.write(line.replace(b",", b";", 7))
    # Load the converted file
    data = pd.read_csv(
        tmp_name,
        sep=";",
        header=None,
        names=self.COL_NAMES,
        index_col=False,
        converters={"c": float, "v": str},
        escapechar="\\",
        # parse_dates=["t1", "t2"],
        # infer_datetime_format=True,
    )
    # Cleanup
    if os.path.exists(tmp_name):
        os.remove(tmp_name)

    # Convert values to correct types according to model_spec
    def convert_row(row):
        try:
            row[2] = self.dt_conv[(row[0], row[1])](row[2])
        except KeyError as e:
            raise KeyError(f"No converter for {(row[0], row[1])}, with value {row[2]}.") from e
        except ValueError:
            self.log.error("ValueError in conversion, v: %s", row)
            return row
        return row

    attrs = {entity_attr[1] for entity_attr in self.dt_conv}
    conv_vals = data.loc[data["attr"].isin(attrs), ("type", "attr", "v")].apply(
        convert_row, axis=1, raw=True
    )
    if len(conv_vals) != len(data):
        self.log.warning(
            "Dropped %s rows due to missing attributes in config", len(data) - len(conv_vals)
        )
        self.log.info("Missing attrs: %s", [x for x in data["attr"].unique() if x not in attrs])
    data["v"] = conv_vals["v"]
    return data[data["attr"].apply(lambda x: x in attrs)]

get_converter

get_converter(attr_data_type: str) -> Callable[[str], Any]

Return a function converting a string to given data type.

Source code in dp3/scripts/datapoint_log_converter.py
def get_converter(attr_data_type: str) -> Callable[[str], Any]:
    """Return a function converting a string to given data type."""
    # basic type
    if attr_data_type in CONVERTERS:
        return CONVERTERS[attr_data_type]
    # array<X>, set<X>, dict<X,Y,Z>
    if (
        re.match(re_array, attr_data_type)
        or re.match(re_set, attr_data_type)
        or re.match(re_dict, attr_data_type)
    ):
        return json.loads
    # link<X>
    if re.match(re_link, attr_data_type):
        return str
    # category<X; Y>
    if re.match(re_category, attr_data_type):
        return str
    raise ValueError(f"No conversion function for attribute type '{attr_data_type}'")

get_out_path

get_out_path(in_file_path, output_dir)

Return output file path based on the input file. Parses the date from input filename, fits date into prepared pattern: "dp_log_{date}.json".

Source code in dp3/scripts/datapoint_log_converter.py
def get_out_path(in_file_path, output_dir):
    """
    Return output file path based on the input file.
    Parses the date from input filename, fits date into prepared pattern: "dp_log_{date}.json".
    """
    date = in_file_path.split("-")[-1]
    if date.endswith(".gz"):
        date = date[:-3]
    out_filename = f"dp_log_{date}.json"
    return os.path.join(output_dir, out_filename)