These code snippets are used to process the parquet files from fs-data into a common-ish schema for database injest and analysis

Oct 24 DBC Derrived Schema


  optional double field_id=-1 Time_ms;

  optional boolean field_id=-1 SME_THROTL_PowerReady;

  optional float field_id=-1 SME_THROTL_MBB_Alive;

  optional boolean field_id=-1 SME_THROTL_Reverse;

  optional boolean field_id=-1 SME_THROTL_Forward;

  optional float field_id=-1 SME_THROTL_MaxSpeed;

  optional float field_id=-1 SME_THROTL_TorqueDemand;

  optional float field_id=-1 SME_CURRLIM_ChargeCurrentLim;

  optional float field_id=-1 SME_CURRLIM_DischargeCurrentLim;

  optional boolean field_id=-1 SME_TRQSPD_SOC_Low_Traction;

  optional boolean field_id=-1 SME_TRQSPD_SOC_Low_Hydraulic;

  optional boolean field_id=-1 SME_TRQSPD_Reverse;

  optional boolean field_id=-1 SME_TRQSPD_Forward;

  optional boolean field_id=-1 SME_TRQSPD_Park_Brake;

  optional boolean field_id=-1 SME_TRQSPD_Pedal_Brake;

  optional boolean field_id=-1 SME_TRQSPD_Controller_Overtermp;

  optional boolean field_id=-1 SME_TRQSPD_Key_switch_overvolt;

  optional boolean field_id=-1 SME_TRQSPD_Key_switch_undervolt;

  optional boolean field_id=-1 SME_TRQSPD_Running;

  optional boolean field_id=-1 SME_TRQSPD_Traction;

  optional boolean field_id=-1 SME_TRQSPD_Hydraulic;

  optional boolean field_id=-1 SME_TRQSPD_Powering_Enabled;

  optional boolean field_id=-1 SME_TRQSPD_Powering_Ready;

  optional boolean field_id=-1 SME_TRQSPD_contactor_closed;

  optional boolean field_id=-1 SME_TRQSPD_Precharging;

  optional float field_id=-1 SME_TRQSPD_Speed;

  optional float field_id=-1 SME_TRQSPD_MotorFlags;

  optional float field_id=-1 SME_TRQSPD_Torque;

  optional float field_id=-1 SME_TEMP_FaultLevel;

  optional float field_id=-1 SME_TEMP_FaultCode;

  optional float field_id=-1 SME_TEMP_DC_Bus_V;

  optional float field_id=-1 SME_TEMP_ControllerTemperature;

  optional float field_id=-1 SME_TEMP_MotorTemperature;

  optional float field_id=-1 SME_TEMP_BusCurrent;

  optional boolean field_id=-1 ACC_STATUS_BMS_FAULT;

  optional boolean field_id=-1 ACC_STATUS_IMD_FAULT;

  optional boolean field_id=-1 ACC_STATUS_SHUTDOWN_STATE;

  optional boolean field_id=-1 ACC_STATUS_PRECHARGE_DONE;

  optional boolean field_id=-1 ACC_STATUS_PRECHARGING;

  optional boolean field_id=-1 ACC_STATUS_CELL_TOO_LOW;

  optional boolean field_id=-1 ACC_STATUS_CELL_TOO_HIGH;

  optional boolean field_id=-1 ACC_STATUS_TEMP_TOO_LOW;

  optional boolean field_id=-1 ACC_STATUS_TEMP_TOO_HIGH;

  optional boolean field_id=-1 ACC_STATUS_TEMP_TOO_HIGH_CRG;

  optional boolean field_id=-1 ACC_STATUS_CHARGING;

  optional float field_id=-1 ACC_STATUS_GLV_VOLTAGE;

  optional double field_id=-1 ACC_STATUS_CELL_FAULT_INDEX;

  optional boolean field_id=-1 ACC_STATUS_BALANCING;

  optional float field_id=-1 ACC_POWER_PACK_VOLTAGE;

  optional float field_id=-1 ACC_POWER_SOC;

  optional float field_id=-1 ACC_POWER_CURRENT;

  optional float field_id=-1 ACC_SEG0_VOLTS_CELL0;

  optional float field_id=-1 ACC_SEG0_VOLTS_CELL1;

  optional float field_id=-1 ACC_SEG0_VOLTS_CELL2;

  optional float field_id=-1 ACC_SEG0_VOLTS_CELL3;

  optional float field_id=-1 ACC_SEG0_VOLTS_CELL4;

  optional float field_id=-1 ACC_SEG0_VOLTS_CELL5;

  optional float field_id=-1 ACC_SEG0_TEMPS_CELL0;

  optional float field_id=-1 ACC_SEG0_TEMPS_CELL1;

  optional float field_id=-1 ACC_SEG0_TEMPS_CELL2;

  optional float field_id=-1 ACC_SEG0_TEMPS_CELL3;

  optional float field_id=-1 ACC_SEG0_TEMPS_CELL4;

  optional float field_id=-1 ACC_SEG0_TEMPS_CELL5;

  optional float field_id=-1 ACC_SEG1_VOLTS_CELL0;

  optional float field_id=-1 ACC_SEG1_VOLTS_CELL1;

  optional float field_id=-1 ACC_SEG1_VOLTS_CELL2;

  optional float field_id=-1 ACC_SEG1_VOLTS_CELL3;

  optional float field_id=-1 ACC_SEG1_VOLTS_CELL4;

  optional float field_id=-1 ACC_SEG1_VOLTS_CELL5;

  optional float field_id=-1 ACC_SEG1_TEMPS_CELL0;

  optional float field_id=-1 ACC_SEG1_TEMPS_CELL1;

  optional float field_id=-1 ACC_SEG1_TEMPS_CELL2;

  optional float field_id=-1 ACC_SEG1_TEMPS_CELL3;

  optional float field_id=-1 ACC_SEG1_TEMPS_CELL4;

  optional float field_id=-1 ACC_SEG1_TEMPS_CELL5;

  optional float field_id=-1 ACC_SEG2_VOLTS_CELL0;

  optional float field_id=-1 ACC_SEG2_VOLTS_CELL1;

  optional float field_id=-1 ACC_SEG2_VOLTS_CELL2;

  optional float field_id=-1 ACC_SEG2_VOLTS_CELL3;

  optional float field_id=-1 ACC_SEG2_VOLTS_CELL4;

  optional float field_id=-1 ACC_SEG2_VOLTS_CELL5;

  optional float field_id=-1 ACC_SEG2_TEMPS_CELL0;

  optional float field_id=-1 ACC_SEG2_TEMPS_CELL1;

  optional float field_id=-1 ACC_SEG2_TEMPS_CELL2;

  optional float field_id=-1 ACC_SEG2_TEMPS_CELL3;

  optional float field_id=-1 ACC_SEG2_TEMPS_CELL4;

  optional float field_id=-1 ACC_SEG2_TEMPS_CELL5;

  optional float field_id=-1 ACC_SEG3_VOLTS_CELL0;

  optional float field_id=-1 ACC_SEG3_VOLTS_CELL1;

  optional float field_id=-1 ACC_SEG3_VOLTS_CELL2;

  optional float field_id=-1 ACC_SEG3_VOLTS_CELL3;

  optional float field_id=-1 ACC_SEG3_VOLTS_CELL4;

  optional float field_id=-1 ACC_SEG3_VOLTS_CELL5;

  optional float field_id=-1 ACC_SEG3_TEMPS_CELL0;

  optional float field_id=-1 ACC_SEG3_TEMPS_CELL1;

  optional float field_id=-1 ACC_SEG3_TEMPS_CELL2;

  optional float field_id=-1 ACC_SEG3_TEMPS_CELL3;

  optional float field_id=-1 ACC_SEG3_TEMPS_CELL4;

  optional float field_id=-1 ACC_SEG3_TEMPS_CELL5;

  optional float field_id=-1 ACC_SEG4_VOLTS_CELL0;

  optional float field_id=-1 ACC_SEG4_VOLTS_CELL1;

  optional float field_id=-1 ACC_SEG4_VOLTS_CELL2;

  optional float field_id=-1 ACC_SEG4_VOLTS_CELL3;

  optional float field_id=-1 ACC_SEG4_VOLTS_CELL4;

  optional float field_id=-1 ACC_SEG4_VOLTS_CELL5;

  optional float field_id=-1 ACC_SEG4_TEMPS_CELL0;

  optional float field_id=-1 ACC_SEG4_TEMPS_CELL1;

  optional float field_id=-1 ACC_SEG4_TEMPS_CELL2;

  optional float field_id=-1 ACC_SEG4_TEMPS_CELL3;

  optional float field_id=-1 ACC_SEG4_TEMPS_CELL4;

  optional float field_id=-1 ACC_SEG4_TEMPS_CELL5;

  optional float field_id=-1 VDM_GPS_Latitude;

  optional float field_id=-1 VDM_GPS_Longitude;

  optional float field_id=-1 VDM_GPS_SPEED;

  optional float field_id=-1 VDM_GPS_ALTITUDE;

  optional float field_id=-1 VDM_GPS_TRUE_COURSE;

  optional float field_id=-1 VDM_GPS_SATELLITES_IN_USE;

  optional float field_id=-1 VDM_GPS_VALID1;

  optional float field_id=-1 VDM_GPS_VALID2;

  optional float field_id=-1 VDM_UTC_DATE_YEAR;

  optional float field_id=-1 VDM_UTC_DATE_MONTH;

  optional float field_id=-1 VDM_UTC_DATE_DAY;

  optional float field_id=-1 VDM_UTC_TIME_HOURS;

  optional float field_id=-1 VDM_UTC_TIME_MINUTES;

  optional float field_id=-1 VDM_UTC_TIME_SECONDS;

  optional float field_id=-1 VDM_X_AXIS_ACCELERATION;

  optional float field_id=-1 VDM_Y_AXIS_ACCELERATION;

  optional float field_id=-1 VDM_Z_AXIS_ACCELERATION;

  optional float field_id=-1 VDM_X_AXIS_YAW_RATE;

  optional float field_id=-1 VDM_Y_AXIS_YAW_RATE;

  optional float field_id=-1 VDM_Z_AXIS_YAW_RATE;

  optional float field_id=-1 ETC_STATUS_HE1;

  optional float field_id=-1 ETC_STATUS_HE2;

  optional float field_id=-1 ETC_STATUS_BRAKE_SENSE_VOLTAGE;

  optional boolean field_id=-1 ETC_STATUS_RTD_BUTTON;

  optional boolean field_id=-1 ETC_STATUS_RTDS;

  optional boolean field_id=-1 ETC_STATUS_REVERSE;

  optional boolean field_id=-1 ETC_STATUS_BRAKELIGHT;

  optional float field_id=-1 ETC_STATUS_PEDAL_TRAVEL;

  optional boolean field_id=-1 ETC_STATUS_RTD;

  optional boolean field_id=-1 ETC_STATUS_IMPLAUSIBILITY;

  optional boolean field_id=-1 ETC_STATUS_TS_ACTIVE;

  optional float field_id=-1 PDB_POWER_A_GLV_VOLTAGE;

  optional float field_id=-1 PDB_POWER_A_CURRENT_SHUTDOWN;

  optional float field_id=-1 PDB_POWER_A_CURRENT_ACC;

  optional float field_id=-1 PDB_POWER_A_CURRENT_ETC;

  optional float field_id=-1 PDB_POWER_A_CURRENT_BPS;

  optional float field_id=-1 PDB_POWER_A_CURRENT_TRACTIVE;

  optional float field_id=-1 PDB_POWER_A_CURRENT_BSPD;

  optional float field_id=-1 PDB_POWER_B_CURRENT_TELEMETRY;

  optional float field_id=-1 PDB_POWER_B_CURRENT_PDB;

  optional float field_id=-1 PDB_POWER_B_CURRENT_DASH;

  optional float field_id=-1 PDB_POWER_B_CURRENT_RTML;

  optional float field_id=-1 PDB_POWER_B_CURRENT_EXTRA_1;

  optional float field_id=-1 PDB_POWER_B_CURRENT_EXTRA_2;

  optional float field_id=-1 TMAIN_DATA_BRAKES_F;

  optional float field_id=-1 TMAIN_DATA_BRAKES_R;

  optional float field_id=-1 TPERIPH_FL_DATA_WHEELSPEED;

  optional float field_id=-1 TPERIPH_FL_DATA_SUSTRAVEL;

  optional float field_id=-1 TPERIPH_FL_DATA_STRAIN;

  optional float field_id=-1 TPERIPH_FL_DATA_SIDE_TIRE_TEMP;

  optional float field_id=-1 TPERIPH_FR_DATA_WHEELSPEED;

  optional float field_id=-1 TPERIPH_FR_DATA_SUSTRAVEL;

  optional float field_id=-1 TPERIPH_FR_DATA_STRAIN;

  optional float field_id=-1 TPERIPH_FR_DATA_SIDE_TIRE_TEMP;

  optional float field_id=-1 TPERIPH_BL_DATA_WHEELSPEED;

  optional float field_id=-1 TPERIPH_BL_DATA_SUSTRAVEL;

  optional float field_id=-1 TPERIPH_BL_DATA_STRAIN;

  optional float field_id=-1 TPERIPH_BL_DATA_SIDE_TIRE_TEMP;

  optional float field_id=-1 TPERIPH_BR_DATA_WHEELSPEED;

  optional float field_id=-1 TPERIPH_BR_DATA_SUSTRAVEL;

  optional float field_id=-1 TPERIPH_BR_DATA_STRAIN;

  optional float field_id=-1 TPERIPH_BR_DATA_SIDE_TIRE_TEMP;

  optional float field_id=-1 TPERIPH_FL_TIRETEMP_1;

  optional float field_id=-1 TPERIPH_FL_TIRETEMP_2;

  optional float field_id=-1 TPERIPH_FL_TIRETEMP_3;

  optional float field_id=-1 TPERIPH_FL_TIRETEMP_4;

  optional float field_id=-1 TPERIPH_FL_TIRETEMP_5;

  optional float field_id=-1 TPERIPH_FL_TIRETEMP_6;

  optional float field_id=-1 TPERIPH_FL_TIRETEMP_7;

  optional float field_id=-1 TPERIPH_FL_TIRETEMP_8;

  optional float field_id=-1 TPERIPH_FR_TIRETEMP_1;

  optional float field_id=-1 TPERIPH_FR_TIRETEMP_2;

  optional float field_id=-1 TPERIPH_FR_TIRETEMP_3;

  optional float field_id=-1 TPERIPH_FR_TIRETEMP_4;

  optional float field_id=-1 TPERIPH_FR_TIRETEMP_5;

  optional float field_id=-1 TPERIPH_FR_TIRETEMP_6;

  optional float field_id=-1 TPERIPH_FR_TIRETEMP_7;

  optional float field_id=-1 TPERIPH_FR_TIRETEMP_8;

  optional float field_id=-1 TPERIPH_BL_TIRETEMP_1;

  optional float field_id=-1 TPERIPH_BL_TIRETEMP_2;

  optional float field_id=-1 TPERIPH_BL_TIRETEMP_3;

  optional float field_id=-1 TPERIPH_BL_TIRETEMP_4;

  optional float field_id=-1 TPERIPH_BL_TIRETEMP_5;

  optional float field_id=-1 TPERIPH_BL_TIRETEMP_6;

  optional float field_id=-1 TPERIPH_BL_TIRETEMP_7;

  optional float field_id=-1 TPERIPH_BL_TIRETEMP_8;

  optional float field_id=-1 TPERIPH_BR_TIRETEMP_1;

  optional float field_id=-1 TPERIPH_BR_TIRETEMP_2;

  optional float field_id=-1 TPERIPH_BR_TIRETEMP_3;

  optional float field_id=-1 TPERIPH_BR_TIRETEMP_4;

  optional float field_id=-1 TPERIPH_BR_TIRETEMP_5;

  optional float field_id=-1 TPERIPH_BR_TIRETEMP_6;

  optional float field_id=-1 TPERIPH_BR_TIRETEMP_7;

  optional float field_id=-1 TPERIPH_BR_TIRETEMP_8;

  optional float field_id=-1 COMMAND_MODE;

  optional float field_id=-1 COMMAND_COMMAND_SPECIFIER;

  optional float field_id=-1 RESPONSE_COMMAND_SPECIFIER;

  optional double field_id=-1 SMPC_CONTROL_PACK_VOLTAGE;

  optional float field_id=-1 SMPC_CONTROL_DEST_NODE_ID;

  optional boolean field_id=-1 SMPC_CONTROL_ENABLE;

  optional boolean field_id=-1 SMPC_CONTROL_CURRENT_10X_MULT;

  optional boolean field_id=-1 SMPC_CONTROL_EVSE_OVERRIDE;

  optional float field_id=-1 SMPC_MAX_DEST_NODE_ID;

  optional double field_id=-1 SMPC_MAX_CHRG_VOLTAGE;

  optional float field_id=-1 SMPC_MAX_CHRG_CURRENT;

  optional float field_id=-1 SMPC_MAX_INPUT_EVSE_OVERRIDE;

  optional float field_id=-1 SMPC_STATUS_CURRENT_MA;

  optional double field_id=-1 SMPC_STATUS_VOLTAGE_MV;

  optional boolean field_id=-1 SMPC_STATUS_J1772_DISCONN;

  optional boolean field_id=-1 SMPC_STATUS_REQUEST_EXCLUSIVE;

  optional boolean field_id=-1 SMPC_STATUS_CHARGER_UNPLUGGED;

  optional boolean field_id=-1 SMPC_STATUS_CHARGING;

  optional boolean field_id=-1 SMPC_STATUS_INTERNAL_FAULT;

  optional boolean field_id=-1 SMPC_STATUS_UNDERVOLTAGE_FAULT;

  optional boolean field_id=-1 SMPC_STATUS_OVERVOLTAGE_FAULT;

  optional boolean field_id=-1 SMPC_STATUS_OVERCURRENT_FAULT;

  optional boolean field_id=-1 SMPC_STATUS_OVERTEMP_FAULT;

  optional boolean field_id=-1 SMPC_STATUS_CHARGE_COMPLETE;

  optional boolean field_id=-1 SMPC_STATUS_READY;

  optional float field_id=-1 SMPC_MAX2_INPUT_CURR_LIM;

  optional float field_id=-1 SMPC_MAX2_EVSE_CURRENT;

  optional float field_id=-1 SMPC_MAX2_MAX_CURRENT_MA;

  optional double field_id=-1 SMPC_MAX2_MAX_VOLTAGE_MV;

  optional float field_id=-1 SMPC_INPUT_MAX_CHARGER_TEMP_C;

  optional float field_id=-1 SMPC_INPUT_AC_INPUT_FREQUENCY_HZ;

  optional float field_id=-1 SMPC_INPUT_AC_INPUT_CURRENT_MA;

  optional boolean field_id=-1 SMPC_INPUT_J1772_TRIGGERED;

  optional boolean field_id=-1 SMPC_INPUT_J1772_DISCONNECTED;

  optional boolean field_id=-1 SMPC_INPUT_J1772_CONNECTED;

  optional float field_id=-1 SMPC_INPUT_AC_INPUT_VOLTAGE;

  optional float field_id=-1 SMPC_SER_FIRMWARE_VER;

  optional double field_id=-1 SMPC_SER_SERIAL_NUMBER;

  optional float field_id=-1 SMPC_SER_PART_NUMBER;



Schema mapping tools

Group files in directory by schema

 
import pyarrow.parquet as pq
 
import os
 
import sys
 
from collections import defaultdict
 
 
 
def get_schema_signature(parquet_path):
 
    try:
 
        parquet_file = pq.ParquetFile(parquet_path)
 
        # Schema signature as normalized string
 
        return str(parquet_file.schema_arrow)
 
    except Exception as e:
 
        print(f"Error reading {parquet_path}: {e}")
 
        return None
 
 
 
def group_by_schema(root_dir):
 
    schema_groups = defaultdict(list)
 
    for dirpath, _, filenames in os.walk(root_dir):
 
        for filename in filenames:
 
            if filename.endswith((".parquet", ".pq")):
 
                full_path = os.path.join(dirpath, filename)
 
                schema_sig = get_schema_signature(full_path)
 
                if schema_sig:
 
                    schema_groups[schema_sig].append(full_path)
 
    return schema_groups
 
 
 
def main():
 
    if len(sys.argv) != 2:
 
        print("Usage: python group_parquet_schemas.py <directory>")
 
        sys.exit(1)
 
 
 
    root_dir = sys.argv[1]
 
    schema_groups = group_by_schema(root_dir)
 
 
 
    print("\n=== Schema Groups ===\n")
 
    for i, (schema, files) in enumerate(schema_groups.items(), 1):
 
        print(f"Group {i}: {len(files)} file(s)")
 
        print("-" * 60)
 
        print(schema)
 
        print("Files:")
 
        for f in files:
 
            print(f"  {f}")
 
        print("\n")
 
 
 
if __name__ == "__main__":
 
    main()
 
 
 

Group files in directory by schema and list differences from given standard schema parquet

 
import pyarrow.parquet as pq
 
import os
 
import sys
 
from collections import defaultdict
 
 
 
def get_schema_dict(parquet_path):
 
    """Return {field_name: field_type_str} for a parquet file."""
 
    parquet_file = pq.ParquetFile(parquet_path)
 
    schema = parquet_file.schema_arrow
 
    return {field.name: str(field.type) for field in schema}
 
 
 
def compare_schemas(standard_schema, test_schema):
 
    """Compare two schema dicts and return (new, missing, mismatch)."""
 
    new_fields = [f for f in test_schema if f not in standard_schema]
 
    missing_fields = [f for f in standard_schema if f not in test_schema]
 
    type_mismatch = [
 
        f for f in test_schema
 
        if f in standard_schema and test_schema[f] != standard_schema[f]
 
    ]
 
    return new_fields, missing_fields, type_mismatch
 
 
 
def get_schema_signature(schema_dict):
 
    """Stable string key for schema equality comparison."""
 
    items = sorted(schema_dict.items())
 
    return tuple(items)
 
 
 
def group_by_schema(root_dir):
 
    """Walk recursively and group files with identical schemas."""
 
    schema_groups = defaultdict(list)
 
    schema_dicts = {}
 
    for dirpath, _, filenames in os.walk(root_dir):
 
        for filename in filenames:
 
            if filename.endswith((".parquet", ".pq")):
 
                full_path = os.path.join(dirpath, filename)
 
                try:
 
                    schema_dict = get_schema_dict(full_path)
 
                    sig = get_schema_signature(schema_dict)
 
                    schema_groups[sig].append(full_path)
 
                    schema_dicts[sig] = schema_dict
 
                except Exception as e:
 
                    print(f"Error reading {full_path}: {e}")
 
    return schema_groups, schema_dicts
 
 
 
def main():
 
    if len(sys.argv) != 3:
 
        print("Usage: python group_and_compare_parquet_schemas.py <directory> <standard.parquet>")
 
        sys.exit(1)
 
 
 
    root_dir = sys.argv[1]
 
    standard_path = sys.argv[2]
 
 
 
    # Load the standard schema
 
    try:
 
        standard_schema = get_schema_dict(standard_path)
 
        print(f"Loaded standard schema from: {standard_path}\n")
 
    except Exception as e:
 
        print(f"Error loading standard schema: {e}")
 
        sys.exit(1)
 
 
 
    # Group all schemas in the directory
 
    schema_groups, schema_dicts = group_by_schema(root_dir)
 
 
 
    # Compare each unique schema group to the standard
 
    for i, (sig, files) in enumerate(schema_groups.items(), 1):
 
        schema_dict = schema_dicts[sig]
 
        new, missing, mismatch = compare_schemas(standard_schema, schema_dict)
 
 
 
        print(f"\n=== Schema Group {i} ({len(files)} file(s)) ===")
 
        for f in files:
 
            print(f"  {f}")
 
 
 
        if not (new or missing or mismatch):
 
            print("✅ Matches standard schema exactly.")
 
        else:
 
            if new:
 
                print("🟩 New fields (not in standard):")
 
                for f in new:
 
                    print(f"  + {f}: {schema_dict[f]}")
 
            if missing:
 
                print("🟥 Missing fields (in standard but not here):")
 
                for f in missing:
 
                    print(f"  - {f}: {standard_schema[f]}")
 
            if mismatch:
 
                print("🟨 Type mismatches:")
 
                for f in mismatch:
 
                    print(f"  * {f}: standard={standard_schema[f]} | here={schema_dict[f]}")
 
 
 
if __name__ == "__main__":
 
    main()
 
 
 

Schema Conforming Strategy

Timestamp

timestamp *1000 Time_ms

Time *1000 Time_ms

None Incremeneting Time_ms field by 11.91

New fields

drop

Missing Fields

Ignore and set null