import numpy as np import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def read_txt_annotations(txt_file, offset=0): with open(txt_file) as f: content = f.readlines() if len(content) == 0: print('empty file: ' + str(txt_file)) return np.asarray([]) if not content[0].strip()[0].isdigit(): first_line = content.pop(0) # print("dropping first line of "+txt_file+" because it was: "+first_line) if content is None or len(content) <= 0: print('nothing left for file: '+str(txt_file)) return np.asarray([]) num_cols = len(content[0].strip().split()) assert num_cols >= 2 times = np.ones((len(content), num_cols)) * -1 for i_line, line in enumerate(content): parts = line.split() time = float(parts[0]) times[i_line][0] = time if len(parts) < 2: logger.warning("No label at line: " + str(i_line) + " in file: " + txt_file) continue temp_type = parts[1].strip() label = float(temp_type)+offset times[i_line][1] = label if len(parts) < 3: logger.debug("No velocity at line: " + str(i_line) + " in file: " + txt_file) else: if num_cols < 3: logger.warning("more columns detected than in first row for row nr %d: %s"%(i_line, line)) times[i_line][2] = float(parts[2].strip()) return times def clean_annotation(content): # if events remain, look for repetitions and remove them: if len(content) > 0: content = np.asarray(content) times = content[np.argsort(content[:, 1])] times = times[times[:, 0].argsort(kind='mergesort')] # we dont care about performance so much, but need a stable sort idx_in = 1 idx_out = 1 content[0, :] = times[0, :] while idx_in < times.shape[0]: content[idx_out, :] = times[idx_in, :] if all(np.equal(times[idx_in - 1, :2], (times[idx_in, :2]))): # time and inst equal -> remove (we dont care about different velocities) # print('### cur file: ' + cur_file + ' time: ' + str(times[idx_in]) + ' -- ' + str(times[idx_in - 1])) idx_in += 1 else: idx_in += 1 idx_out += 1 content = content[:idx_out, :] return content def posprocess_annotations(times, inst_map=None, offset=0, num_classes=100): content = [] if len(times) <= 0: return content times = times[times[:, 0].argsort(kind='mergesort')] num_cols = times.shape[1] assert num_cols > 1 if num_cols < 3: times = np.hstack((times, np.ones((times.shape[0], 3-num_cols))*-1)) assert num_cols == 3 # last_entry = [-1 for _ in range(num_cols)] for (time, label, velocity) in times: if inst_map is not None: if label in inst_map: label = inst_map[label] else: logger.debug("No label mapping found for: %s" % label) label = -1 new_label = label - offset if 0 <= new_label < num_classes: # cur_entry = [time, new_label, velocity] # if not all([cur_entry[i] == last_entry[i] for i in range(num_cols)]): # skip equal entries caused by mapping # content.append(cur_entry) content.append([time, new_label, velocity]) # last_entry = cur_entry else: logger.debug("No valid label: %s, ignoring entry" % new_label) return clean_annotation(content) def compute_target_array_from_times(times, fps, num_frames, num_targets, offset=0, soft_targets=False, statistics=None): if len(times) > 0 and np.max(times, 0)[0] * fps > num_frames: logging.warning("Maximum time is larger than number of samples - cutting times.") if len(times) > 0 and np.max(times, 0)[1] >= num_targets: logging.warning("Maximum label index is larger than num_targets - cutting labels.") targets = np.zeros((num_frames, num_targets)) times_out_cnt = 0 for entry_nr, time_entry in enumerate(times): time = time_entry[0] + offset time_idx = int(time*fps) inst_idx = int(time_entry[1]) if 0 <= inst_idx < num_targets: if time_idx < num_frames: targets[time_idx, inst_idx] = 1 if soft_targets: if time_idx > 0: targets[time_idx-1, inst_idx] = 0.5 if time_idx < num_frames-1: targets[time_idx+1, inst_idx] = 0.5 else: # logging.warning('Time idx (%f / %d) out of bounds (%d) at entry number: %d ' % (time, time_idx, num_frames, entry_nr)) times_out_cnt += 1 else: logging.warning('Label idx (%d) out of range at entry number: %d' % (inst_idx, entry_nr)) if statistics is not None: statistics.total_pruned += 1 return targets, times_out_cnt def write_txt_annotation(filename, data): with open(filename, 'w') as f: for entry in data: f.write("%3.5f \t %d\n" % (entry[0], entry[1])) def write_3c_txt_annotation(filename, data): with open(filename, 'w') as f: for entry in data: f.write("%3.5f \t %d \t %f \n" % (entry[0], entry[1], entry[2]))