#!/usr/bin/env python3 # # LabcomDataDat.py # # 2023-12-27 ver.1.0.0 # 2023-12-28 ver.1.0.1 # import sys import os import struct import datetime import zlib class LabcomDataCommon(): def __init__(self) : self.dummy=0 def timestamp(self) -> int: dt = datetime.datetime(self.saved_year, self.saved_mon,self.saved_day , \ self.saved_hour, self.saved_min , self.saved_sec, 0) return int(dt.timestamp()) def saved_time(self) -> str: if 71 > self.saved_year or ( 200 < self.saved_year and 1980 > self.saved_year ) : return "UNDEF" dt = datetime.datetime(self.saved_year, self.saved_mon,self.saved_day , \ self.saved_hour, self.saved_min , self.saved_sec, 0) return dt.strftime('"%a, %d %b %Y %H:%M:%S +0900"') def module_group_string(self): MODULE_G_NAME = { 0:'NoGroup', 3:'ANY', 1:'CAMAC', 2:'WE7000', 4:'CompactPCI/PXI', 5:'WE7000' } if self.module_group in MODULE_G_NAME: return (MODULE_G_NAME[self.module_group]) else: print('ModuleGroup: ',self.module_group) return ("UNDEF") def module_type_string(self): MODULE_T_NAME = { 0:'NoModule', 1:'Aurora14', 2:'TOYO629', 3:'SEGDCM', 4:'SEGADC', 5:'DMOD', 6:'CTS308', 7:'CTS419', 8:'CTS504', 9:'TEST', 10:'General', 11:'CANE_N209', 12:'CANE_N470', 13:'CPS412', 14:'HOUSIN_C012_1', 15:'HOUSIN_C012_2', 16:'DTS', 17:'CTS606B', 18:'RPC330', 19:'CPHA1210', 20:'MIC480_5', 91:'PXI6115', 92:'PXI6602', 93:'PXI1409', 94:'PXI1411', 101:'PXI6133', 121:'WE800', 122:'WE7272', 123:'WE7251', 124:'WE7275', 125:'WE7273', 126:'WE7562', 127:'WE7271' } if self.module_type in MODULE_T_NAME: return (MODULE_T_NAME[self.module_type]) else: return ("UNDEF") def data_type_string(self): DATA_T_NAME = { 1:'RAW', 2:'ANA', 3:'PPM'} if self.data_type in DATA_T_NAME: return (DATA_T_NAME[self.data_type]) else: return ("DEF") def make_channel_file(self, diag_name: str, shot_no:int , sub_shot:int, savepath: str ): path_w = "%s/%s-%d-%d-%d.prm" % (savepath, diag_name, shot_no, sub_shot, self.channel_number) try : mod_type = self.module_type_string() except : mod_type = diag_name with open(path_w, mode='x') as f: f.write("%s,%s,%s,1\n" % (mod_type, "DiagnosticsName", diag_name)) f.write("%s,%s,%d,4\n" % (mod_type, "Shot", shot_no)) f.write("%s,%s,%d,4\n" % (mod_type, "SubShot", sub_shot)) for ch_param in self.ch_params: f.write("%s,%s,%s,%d\n" % (mod_type, ch_param.name, ch_param.val, ch_param.type)) path_w = "%s/%s-%d-%d-%d.dat" % (savepath, diag_name, shot_no, sub_shot, self.channel_number) with open(path_w, mode='xb') as f: if 0 < self.comp_length : if 'ZLIB' in self.comp_method.upper() : f.write(zlib.decompress(self.channel_comp_data) ) else : f.write(self.channel_comp_data ) def print_update_oodbms_sql(self): if 0 < self.timestamp() : print("UPDATE oodbms\ SET data_len=%d, comp_len=%d, channel=%d, datestamp=%d, collect=%d\ WHERE real_arcshot=%d AND real_subshot=%d\ AND diag_id=(SELECT diag_id FROM diag WHERE diag_name='%s');" % (self.total_data_size, self.total_comp_size, self.n_channels, self.timestamp(), self.c_channels, self.shot_no, self.sub_shot, self.diag_name) ) else : print("UPDATE oodbms\ SET data_len=%d, comp_len=%d, channel=%d, collect=%d\ WHERE real_arcshot=%d AND real_subshot=%d\ AND diag_id=(SELECT diag_id FROM diag WHERE diag_name='%s');" % (self.total_data_size, self.total_comp_size, self.n_channels, self.c_channels, self.shot_no, self.sub_shot, self.diag_name) ) def make_shot_dir(self, savepath: str): if '.' != savepath : os.makedirs(savepath, exist_ok=True) return (savepath) class LabcomDataHeader(): def __init__(self, data_bin: bytes) : self.data_len = len(data_bin) # 36byte self.bin_len = 36 self.next_offset = self.bin_len # 4:fid1, 4:fid2, # 2:fver_majar, 2:ver_minor, 4:backup_stamp, # 2:apl_ver, 2:n_channels, 2:n_modules, 2:compiler_ver, # 4:copyright_offset, 4:note_offset, # 4:channel_offset header = struct.unpack_from('<2I2HI4H3I', data_bin, 0) self.fid1 = header[0] self.fid2 = header[1] if (0xFFFFFFFF == self.fid1) and (0xFFFFFFFF == self.fid2) : self.backup_stamp = header[4] self.n_channels = header[6] self.n_modules = header[7] self.channel_offset = header[11] self.c_channels = int((len(data_bin)-header[11])/4) if 0 == self.n_channels : self.n_channels = self.c_channels def check_dat_format(self) -> bool: if (0xFFFFFFFF == self.fid1) and (0xFFFFFFFF == self.fid2) : return True else: return False def channel_offsets(self, data_bin: bytes) -> int: #offset_start = len(data_bin)-self.n_channels*4 offset_start = self.channel_offset ch_offsets = [] ofs_struct=struct.Struct(' int: offset_start = self.bin_len ch_offsets = [] ofs_struct=struct.Struct(' len(self.ver_no) : self.ver_no.append('0') tmp_params = [] tmp_params.append(LabcomDataChParam("ChannelNumber",self.channel_number,4)) tmp_params.append(LabcomDataChParam("ModuleGroup",self.module_group_string(),1)) # self.module_group tmp_params.append(LabcomDataChParam("ModuleType",self.module_type_string(),1)) # self.module_type tmp_params.append(LabcomDataChParam("ManagementVersion",self.manage_version,1)) tmp_params.append(LabcomDataChParam("DataType",self.data_type_string(),1)) # self.data_type if 0 != self.value_len : tmp_params.append(LabcomDataChParam("Resolution(bit)",self.value_len,4)) # self.value_len tmp_params.append(LabcomDataChParam("DataLength(byte)",self.data_length,4)) tmp_params.append(LabcomDataChParam("CompLength(byte)",self.comp_length,4)) tmp_params.append(LabcomDataChParam("CompressionMethod",self.comp_method,1)) tmp_params.append(LabcomDataChParam("CompressionVersion",self.comp_version,1)) tmp_params.append(LabcomDataChParam("Comment",self.channel_note,1)) if '1' == self.ver_no[1] : term_idx = data_bin.index(0x00, next_idx) self.signal_name = data_bin[next_idx:term_idx].decode() # -'\0' next_idx = term_idx+1 term_idx = data_bin.index(0x00, next_idx) self.dts_source = data_bin[next_idx:term_idx].decode() # -'\0' next_idx = term_idx+1 term_idx = data_bin.index(0x00, next_idx) term_32 = next_idx+31 if 31 < (term_idx-next_idx) else term_idx self.dts_host_id = data_bin[next_idx:term_32].decode('shift_jis') # -'\0' next_idx = term_idx+1 term_idx = data_bin.index(0x00, next_idx) term_32 = next_idx+31 if 31 < (term_idx-next_idx) else term_idx self.dts_mod_id = data_bin[next_idx:term_32].decode('shift_jis') # -'\0' next_idx = term_idx+1 term_idx = data_bin.index(0x00, next_idx) self.dts_clk_ch = data_bin[next_idx:term_idx].decode() # -'\0' next_idx = term_idx+1 self.dts_trg_ch, self.dts_user_clk, = struct.unpack_from('= self.module_type : self.ch_params += self.mem_image_params( data_bin, next_idx, self.we72xx_pdefs()) return if 0x0a != self.module_type : return self.params_block_size, self.params_head_size \ = struct.unpack_from('<2I', data_bin, next_idx) # 8bytes self.n_params = int((self.params_head_size-4*2)/(4*4+2)) param_defines = [] next_idx += 8 for ofs in range(0, self.n_params): tmp=LabcomDataChParamStruct(data_bin, next_idx) param_defines.append(tmp) next_idx = tmp.next_offset ch_params_part = LabcomDataChParams(param_defines, data_bin, next_idx ) self.next_offset = ch_params_part.next_offset self.ch_params += ch_params_part.params self.bin_len = ch_params_part.next_offset - channel_offset # print(self.params_block_size,self.params_head_size, self.n_params) # for ch_param in self.ch_params: # print(ch_param.name, ch_param.val, ch_param.type) def mem_image_params(self, data_bin: bytes, chparam_offset: int, param_defs) : tmp_params = [] next_idx = chparam_offset for param in param_defs : try : if 1 == param[2] : term_idx = data_bin.index(0x00, next_idx) val = data_bin[next_idx:term_idx].decode() # -'\0' next_idx = term_idx+1 else : val, = struct.unpack_from(param[1], data_bin, next_idx) next_idx += struct.calcsize(param[1]) tmp_params.append(LabcomDataChParam(param[0], val, param[2] )) except Exception as e: print('Exception: ', param[0]) print(e) break return tmp_params def aurora14_pdefs(self) : pdefs = [ ["Range", '