annotations.py 5.29 KB
Newer Older
Richard Vogl's avatar
Richard Vogl committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import numpy as np
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def read_txt_annotations(txt_file, offset=0):
    with open(txt_file) as f:
        content = f.readlines()

    if len(content) == 0:
        print('empty file: ' + str(txt_file))
        return np.asarray([])

    if not content[0].strip()[0].isdigit():
        first_line = content.pop(0)
        # print("dropping first line of "+txt_file+" because it was: "+first_line)

    if content is None or len(content) <= 0:
        print('nothing left for file: '+str(txt_file))
        return np.asarray([])

    num_cols = len(content[0].strip().split())
    assert num_cols >= 2

    times = np.ones((len(content), num_cols)) * -1
    for i_line, line in enumerate(content):
        parts = line.split()
        time = float(parts[0])
        times[i_line][0] = time

        if len(parts) < 2:
            logger.warning("No label at line: " + str(i_line) + " in file: " + txt_file)
            continue

        temp_type = parts[1].strip()
        label = float(temp_type)+offset
        times[i_line][1] = label

        if len(parts) < 3:
            logger.debug("No velocity at line: " + str(i_line) + " in file: " + txt_file)
        else:
            if num_cols < 3:
                logger.warning("more columns detected than in first row for row nr %d: %s"%(i_line, line))
            times[i_line][2] = float(parts[2].strip())

    return times


def clean_annotation(content):
    # if events remain, look for repetitions and remove them:
    if len(content) > 0:
        content = np.asarray(content)
        times = content[np.argsort(content[:, 1])]
        times = times[times[:, 0].argsort(kind='mergesort')]  # we dont care about performance so much, but need a stable sort
        idx_in = 1
        idx_out = 1
        content[0, :] = times[0, :]
        while idx_in < times.shape[0]:
            content[idx_out, :] = times[idx_in, :]
            if all(np.equal(times[idx_in - 1, :2], (times[idx_in, :2]))):   # time and inst equal -> remove (we dont care about different velocities)
                # print('### cur file: ' + cur_file + ' time: ' + str(times[idx_in]) + ' -- ' + str(times[idx_in - 1]))
                idx_in += 1
            else:
                idx_in += 1
                idx_out += 1

        content = content[:idx_out, :]
    return content


def posprocess_annotations(times, inst_map=None, offset=0, num_classes=100):
    content = []
    if len(times) <= 0:
        return content
    times = times[times[:, 0].argsort(kind='mergesort')]
    num_cols = times.shape[1]
    assert num_cols > 1
    if num_cols < 3:
        times = np.hstack((times, np.ones((times.shape[0], 3-num_cols))*-1))
    assert num_cols == 3
    # last_entry = [-1 for _ in range(num_cols)]
    for (time, label, velocity) in times:
        if inst_map is not None:
            if label in inst_map:
                label = inst_map[label]
            else:
                logger.debug("No label mapping found for: %s" % label)
                label = -1
        new_label = label - offset
        if 0 <= new_label < num_classes:
            # cur_entry = [time, new_label, velocity]
            # if not all([cur_entry[i] == last_entry[i] for i in range(num_cols)]):  # skip equal entries caused by mapping
            #     content.append(cur_entry)
            content.append([time, new_label, velocity])
            # last_entry = cur_entry
        else:
            logger.debug("No valid label: %s, ignoring entry" % new_label)

    return clean_annotation(content)


def compute_target_array_from_times(times, fps, num_frames, num_targets, offset=0, soft_targets=False, statistics=None):

    if len(times) > 0 and np.max(times, 0)[0] * fps > num_frames:
        logging.warning("Maximum time is larger than number of samples - cutting times.")
    if len(times) > 0 and np.max(times, 0)[1] >= num_targets:
        logging.warning("Maximum label index is larger than num_targets - cutting labels.")

    targets = np.zeros((num_frames, num_targets))

    times_out_cnt = 0

    for entry_nr, time_entry in enumerate(times):
        time = time_entry[0] + offset
        time_idx = int(time*fps)
        inst_idx = int(time_entry[1])
        if 0 <= inst_idx < num_targets:
            if time_idx < num_frames:
                targets[time_idx, inst_idx] = 1
                if soft_targets:
                    if time_idx > 0:
                        targets[time_idx-1, inst_idx] = 0.5
                    if time_idx < num_frames-1:
                        targets[time_idx+1, inst_idx] = 0.5
            else:
                # logging.warning('Time idx (%f / %d) out of bounds (%d) at entry number: %d ' % (time, time_idx, num_frames, entry_nr))
                times_out_cnt += 1
        else:
            logging.warning('Label idx (%d) out of range at entry number: %d' % (inst_idx, entry_nr))
            if statistics is not None:
                statistics.total_pruned += 1

    return targets, times_out_cnt


def write_txt_annotation(filename, data):
    with open(filename, 'w') as f:
        for entry in data:
            f.write("%3.5f \t %d\n" % (entry[0], entry[1]))


def write_3c_txt_annotation(filename, data):
    with open(filename, 'w') as f:
        for entry in data:
            f.write("%3.5f \t %d \t %f \n" % (entry[0], entry[1], entry[2]))