Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
Richard Vogl
piano_transcription
Commits
58580d56
Commit
58580d56
authored
Jun 15, 2018
by
Richard Vogl
Browse files
first commit
parents
Changes
15
Hide whitespace changes
Inline
Side-by-side
.gitignore
0 → 100644
View file @
58580d56
.idea
piano_transcription/output*
*.pyc
*.json
*.pkl
*.eps
*.pdf
*.pgf
piano_transcription/__init__.py
0 → 100644
View file @
58580d56
import
os
PROJECT_PATH
=
os
.
path
.
abspath
(
os
.
path
.
dirname
(
__file__
))
OUTPUT_PATH
=
os
.
path
.
join
(
PROJECT_PATH
,
'output'
)
if
not
os
.
path
.
exists
(
OUTPUT_PATH
):
os
.
makedirs
(
OUTPUT_PATH
)
\ No newline at end of file
piano_transcription/data/__init__.py
0 → 100644
View file @
58580d56
import
numpy
as
np
# TODO remove this and load and use real data!
FEAT_SIZE
=
300
OUT_SIZE
=
80
FULL_SEQ_LEN
=
2000
NUM_SAMPLES_TRAIN
=
100
NUM_SAMPLES_VALID
=
10
def
generate_random_sample
():
feat
=
np
.
random
.
rand
(
FULL_SEQ_LEN
,
FEAT_SIZE
)
targ
=
np
.
random
.
rand
(
FULL_SEQ_LEN
,
OUT_SIZE
)
return
feat
,
targ
def
load_data
(
split_nr
):
feat_train
=
[]
targ_train
=
[]
feat_valid
=
[]
targ_valid
=
[]
for
sample_idx
in
range
(
NUM_SAMPLES_TRAIN
):
feat
,
targ
=
generate_random_sample
()
feat_train
.
append
(
feat
)
targ_train
.
append
(
targ
)
for
sample_idx
in
range
(
NUM_SAMPLES_VALID
):
feat
,
targ
=
generate_random_sample
()
feat_valid
.
append
(
feat
)
targ_valid
.
append
(
targ
)
return
feat_train
,
targ_train
,
feat_valid
,
targ_valid
,
FULL_SEQ_LEN
,
FEAT_SIZE
,
OUT_SIZE
piano_transcription/data/annotations.py
0 → 100644
View file @
58580d56
import
numpy
as
np
import
logging
logging
.
basicConfig
(
level
=
logging
.
INFO
)
logger
=
logging
.
getLogger
(
__name__
)
def
read_txt_annotations
(
txt_file
,
offset
=
0
):
with
open
(
txt_file
)
as
f
:
content
=
f
.
readlines
()
if
len
(
content
)
==
0
:
print
(
'empty file: '
+
str
(
txt_file
))
return
np
.
asarray
([])
if
not
content
[
0
].
strip
()[
0
].
isdigit
():
first_line
=
content
.
pop
(
0
)
# print("dropping first line of "+txt_file+" because it was: "+first_line)
if
content
is
None
or
len
(
content
)
<=
0
:
print
(
'nothing left for file: '
+
str
(
txt_file
))
return
np
.
asarray
([])
num_cols
=
len
(
content
[
0
].
strip
().
split
())
assert
num_cols
>=
2
times
=
np
.
ones
((
len
(
content
),
num_cols
))
*
-
1
for
i_line
,
line
in
enumerate
(
content
):
parts
=
line
.
split
()
time
=
float
(
parts
[
0
])
times
[
i_line
][
0
]
=
time
if
len
(
parts
)
<
2
:
logger
.
warning
(
"No label at line: "
+
str
(
i_line
)
+
" in file: "
+
txt_file
)
continue
temp_type
=
parts
[
1
].
strip
()
label
=
float
(
temp_type
)
+
offset
times
[
i_line
][
1
]
=
label
if
len
(
parts
)
<
3
:
logger
.
debug
(
"No velocity at line: "
+
str
(
i_line
)
+
" in file: "
+
txt_file
)
else
:
if
num_cols
<
3
:
logger
.
warning
(
"more columns detected than in first row for row nr %d: %s"
%
(
i_line
,
line
))
times
[
i_line
][
2
]
=
float
(
parts
[
2
].
strip
())
return
times
def
clean_annotation
(
content
):
# if events remain, look for repetitions and remove them:
if
len
(
content
)
>
0
:
content
=
np
.
asarray
(
content
)
times
=
content
[
np
.
argsort
(
content
[:,
1
])]
times
=
times
[
times
[:,
0
].
argsort
(
kind
=
'mergesort'
)]
# we dont care about performance so much, but need a stable sort
idx_in
=
1
idx_out
=
1
content
[
0
,
:]
=
times
[
0
,
:]
while
idx_in
<
times
.
shape
[
0
]:
content
[
idx_out
,
:]
=
times
[
idx_in
,
:]
if
all
(
np
.
equal
(
times
[
idx_in
-
1
,
:
2
],
(
times
[
idx_in
,
:
2
]))):
# time and inst equal -> remove (we dont care about different velocities)
# print('### cur file: ' + cur_file + ' time: ' + str(times[idx_in]) + ' -- ' + str(times[idx_in - 1]))
idx_in
+=
1
else
:
idx_in
+=
1
idx_out
+=
1
content
=
content
[:
idx_out
,
:]
return
content
def
posprocess_annotations
(
times
,
inst_map
=
None
,
offset
=
0
,
num_classes
=
100
):
content
=
[]
if
len
(
times
)
<=
0
:
return
content
times
=
times
[
times
[:,
0
].
argsort
(
kind
=
'mergesort'
)]
num_cols
=
times
.
shape
[
1
]
assert
num_cols
>
1
if
num_cols
<
3
:
times
=
np
.
hstack
((
times
,
np
.
ones
((
times
.
shape
[
0
],
3
-
num_cols
))
*-
1
))
assert
num_cols
==
3
# last_entry = [-1 for _ in range(num_cols)]
for
(
time
,
label
,
velocity
)
in
times
:
if
inst_map
is
not
None
:
if
label
in
inst_map
:
label
=
inst_map
[
label
]
else
:
logger
.
debug
(
"No label mapping found for: %s"
%
label
)
label
=
-
1
new_label
=
label
-
offset
if
0
<=
new_label
<
num_classes
:
# cur_entry = [time, new_label, velocity]
# if not all([cur_entry[i] == last_entry[i] for i in range(num_cols)]): # skip equal entries caused by mapping
# content.append(cur_entry)
content
.
append
([
time
,
new_label
,
velocity
])
# last_entry = cur_entry
else
:
logger
.
debug
(
"No valid label: %s, ignoring entry"
%
new_label
)
return
clean_annotation
(
content
)
def
compute_target_array_from_times
(
times
,
fps
,
num_frames
,
num_targets
,
offset
=
0
,
soft_targets
=
False
,
statistics
=
None
):
if
len
(
times
)
>
0
and
np
.
max
(
times
,
0
)[
0
]
*
fps
>
num_frames
:
logging
.
warning
(
"Maximum time is larger than number of samples - cutting times."
)
if
len
(
times
)
>
0
and
np
.
max
(
times
,
0
)[
1
]
>=
num_targets
:
logging
.
warning
(
"Maximum label index is larger than num_targets - cutting labels."
)
targets
=
np
.
zeros
((
num_frames
,
num_targets
))
times_out_cnt
=
0
for
entry_nr
,
time_entry
in
enumerate
(
times
):
time
=
time_entry
[
0
]
+
offset
time_idx
=
int
(
time
*
fps
)
inst_idx
=
int
(
time_entry
[
1
])
if
0
<=
inst_idx
<
num_targets
:
if
time_idx
<
num_frames
:
targets
[
time_idx
,
inst_idx
]
=
1
if
soft_targets
:
if
time_idx
>
0
:
targets
[
time_idx
-
1
,
inst_idx
]
=
0.5
if
time_idx
<
num_frames
-
1
:
targets
[
time_idx
+
1
,
inst_idx
]
=
0.5
else
:
# logging.warning('Time idx (%f / %d) out of bounds (%d) at entry number: %d ' % (time, time_idx, num_frames, entry_nr))
times_out_cnt
+=
1
else
:
logging
.
warning
(
'Label idx (%d) out of range at entry number: %d'
%
(
inst_idx
,
entry_nr
))
if
statistics
is
not
None
:
statistics
.
total_pruned
+=
1
return
targets
,
times_out_cnt
def
write_txt_annotation
(
filename
,
data
):
with
open
(
filename
,
'w'
)
as
f
:
for
entry
in
data
:
f
.
write
(
"%3.5f
\t
%d
\n
"
%
(
entry
[
0
],
entry
[
1
]))
def
write_3c_txt_annotation
(
filename
,
data
):
with
open
(
filename
,
'w'
)
as
f
:
for
entry
in
data
:
f
.
write
(
"%3.5f
\t
%d
\t
%f
\n
"
%
(
entry
[
0
],
entry
[
1
],
entry
[
2
]))
\ No newline at end of file
piano_transcription/data/data_pools.py
0 → 100644
View file @
58580d56
import
numpy
as
np
class
BatchIterator
(
object
):
"""
Prototype for batch iterator
"""
def
__init__
(
self
,
batch_size
,
re_iterate
=
1
,
prepare
=
None
,
k_samples
=
None
,
shuffle
=
True
):
self
.
batch_size
=
batch_size
if
prepare
is
None
:
def
prepare
(
*
data
):
return
data
self
.
prepare
=
prepare
self
.
re_iterate
=
re_iterate
self
.
k_samples
=
k_samples
self
.
shuffle
=
shuffle
self
.
epoch_counter
=
0
self
.
n_epochs
=
None
def
__call__
(
self
,
data_pool
):
self
.
data_pool
=
data_pool
if
self
.
k_samples
is
None
or
self
.
k_samples
>
self
.
data_pool
.
shape
[
0
]:
self
.
k_samples
=
self
.
data_pool
.
shape
[
0
]
self
.
n_batches
=
self
.
re_iterate
*
(
self
.
k_samples
//
self
.
batch_size
)
self
.
n_epochs
=
self
.
data_pool
.
shape
[
0
]
//
self
.
k_samples
if
self
.
shuffle
:
self
.
data_pool
.
shuffle
()
return
self
def
__iter__
(
self
):
# compute current epoch index
idx_epoch
=
np
.
mod
(
self
.
epoch_counter
,
self
.
n_epochs
)
# reiterate entire data-set
for
_
in
xrange
(
self
.
re_iterate
):
# use only k samples per epoch
for
i_b
in
xrange
((
self
.
k_samples
+
self
.
batch_size
-
1
)
/
self
.
batch_size
):
# slice batch data
start
=
i_b
*
self
.
batch_size
+
idx_epoch
*
self
.
k_samples
stop
=
(
i_b
+
1
)
*
self
.
batch_size
+
idx_epoch
*
self
.
k_samples
stop
=
np
.
min
([
stop
,
self
.
data_pool
.
shape
[
0
]])
sl
=
slice
(
start
,
stop
)
xb
=
self
.
data_pool
[
sl
]
# get missing samples
n_sampels
=
xb
[
0
].
shape
[
0
]
if
n_sampels
<
self
.
batch_size
:
n_missing
=
self
.
batch_size
-
n_sampels
x_con
=
self
.
data_pool
[
0
:
n_missing
]
for
i_input
in
xrange
(
len
(
xb
)):
xb
[
i_input
]
=
np
.
concatenate
((
xb
[
i_input
],
x_con
[
i_input
]))
yield
self
.
transform
(
xb
)
# increase epoch counter
self
.
epoch_counter
+=
1
# shuffle train data after full set iteration
if
self
.
shuffle
and
(
idx_epoch
+
1
)
==
self
.
n_epochs
:
self
.
data_pool
.
shuffle
()
def
transform
(
self
,
data
):
return
self
.
prepare
(
*
data
)
class
UniversalRegressionDataPool
(
object
):
""" Regression data pool for RNNs, ConvNets and Convolutional RNNs """
def
__init__
(
self
,
sequences
,
target_sequences
,
sub_sequence_length
=
1
,
data_context
=
1
,
step_size
=
1
,
central_target
=
True
,
do_shuffle
=
True
):
""" Constructor
Parameters
----------
sequences : list
List of data sequences (input to your network)
target_sequences : list
List of target sequences (target of your network prediction). Must be in line with sequences.
sub_sequence_length : int
Number of time steps for each training example
data_context : int
Temporal context for each time step. This is required for convolution and convolution RNNs.
Has to be an odd number 1, 3, 5, ...
step_size : int
Step size for producing the sub sequences
central_target : bool
If true only the central target is returned for prediction. Set this to true for conv-Nets and conv-RNNs.
do_shuffle : bool
If true data gets shuffled on initialization
"""
self
.
sequences
=
sequences
self
.
target_sequences
=
target_sequences
self
.
sub_seqence_length
=
sub_sequence_length
self
.
data_context
=
data_context
self
.
step_size
=
step_size
self
.
central_target
=
central_target
self
.
half_context
=
(
self
.
data_context
-
1
)
//
2
self
.
do_shuffle
=
do_shuffle
self
.
n_sequences
=
len
(
self
.
sequences
)
self
.
train_items
=
None
self
.
shape
=
None
self
.
prepare_train_items
()
if
self
.
do_shuffle
:
self
.
shuffle
()
def
shuffle
(
self
):
rand_idx
=
np
.
random
.
permutation
(
self
.
shape
[
0
])
self
.
train_items
=
self
.
train_items
[
rand_idx
]
def
prepare_train_items
(
self
):
seq_lengths
=
[
sequence
.
shape
[
0
]
for
sequence
in
self
.
sequences
]
n_items
=
sum
([
int
(
np
.
ceil
(
max
(
0
,
seq_len
-
self
.
sub_seqence_length
-
self
.
data_context
+
1
)
/
float
(
self
.
step_size
)))
for
seq_len
in
seq_lengths
])
self
.
train_items
=
np
.
zeros
((
n_items
,
2
),
dtype
=
np
.
int
)
out_idx
=
0
for
i_seq
in
xrange
(
self
.
n_sequences
):
sequence
=
self
.
sequences
[
i_seq
]
target
=
self
.
target_sequences
[
i_seq
]
assert
len
(
sequence
)
==
len
(
target
)
start_idx
=
self
.
half_context
stop_idx
=
sequence
.
shape
[
0
]
-
self
.
sub_seqence_length
-
self
.
half_context
for
i_step
in
xrange
(
start_idx
,
stop_idx
,
self
.
step_size
):
self
.
train_items
[
out_idx
,
:]
=
np
.
asarray
(([
i_seq
,
i_step
]),
dtype
=
np
.
int
).
reshape
((
1
,
2
))
out_idx
+=
1
self
.
shape
=
[
self
.
train_items
.
shape
[
0
]]
assert
n_items
==
self
.
shape
[
0
]
def
__getitem__
(
self
,
key
):
# get batch
if
key
.
__class__
!=
slice
:
key
=
slice
(
key
,
key
+
1
)
# fix out of bounds
key
=
slice
(
key
.
start
,
np
.
min
([
self
.
shape
[
0
],
key
.
stop
]))
# prepare list of files
X
=
[]
Y
=
[]
for
item_id
in
range
(
key
.
start
,
key
.
stop
):
seq_id
,
step_idx
=
self
.
train_items
[
item_id
]
# get sequences
seq
=
self
.
sequences
[
seq_id
]
targ
=
self
.
target_sequences
[
seq_id
]
seq_stack
=
[]
targ_stack
=
[]
for
i_sub_seq
in
xrange
(
self
.
sub_seqence_length
):
# define time steps
t0
=
step_idx
+
i_sub_seq
-
self
.
half_context
t1
=
t0
+
self
.
data_context
# get sequence window
seq_stack
.
append
(
seq
[
t0
:
t1
])
if
self
.
central_target
:
center_idx
=
t0
+
self
.
half_context
targ_stack
.
append
(
targ
[
center_idx
:
center_idx
+
1
])
else
:
targ_stack
.
append
(
targ
[
t0
:
t1
])
seq_stack
=
np
.
asarray
(
seq_stack
)
targ_stack
=
np
.
asarray
(
targ_stack
)
X
.
append
(
seq_stack
)
Y
.
append
(
targ_stack
)
X
=
np
.
asarray
(
X
,
dtype
=
np
.
float32
)
Y
=
np
.
asarray
(
Y
,
dtype
=
np
.
float32
)
return
[
X
,
Y
]
if
__name__
==
'__main__'
:
""" main """
# create dummy data
sequences
=
[]
target_sequences
=
[]
for
i
in
xrange
(
10
):
seq_len
=
np
.
random
.
randint
(
low
=
200
,
high
=
500
)
sequences
.
append
(
np
.
random
.
randn
(
seq_len
,
23
))
target_sequences
.
append
(
np
.
random
.
randn
(
seq_len
,
3
))
data_pool
=
UniversalRegressionDataPool
(
sequences
,
target_sequences
,
sub_sequence_length
=
15
,
data_context
=
3
,
step_size
=
1
,
central_target
=
True
)
x
,
y
=
data_pool
[
0
:
10
]
print
x
.
shape
,
y
.
shape
for
i
in
xrange
(
data_pool
.
shape
[
0
]):
x
,
y
=
data_pool
[
i
:
i
+
1
]
piano_transcription/data/features.py
0 → 100644
View file @
58580d56
from
madmom.audio.spectrogram
import
LogarithmicFilteredSpectrogram
,
SpectrogramDifference
,
LogarithmicFilterbank
import
numpy
as
np
def
extract_features
(
audiofile
):
num_channels
=
1
sample_rate
=
44100
frame_sizes
=
[
1024
,
2048
,
4096
]
fps
=
100
num_bands
=
12
fmin
=
30
fmax
=
17000
norm_filters
=
True
start_silence
=
0
diff
=
True
diff_ratio
=
0.5
positive_diffs
=
True
spectrograms
=
[]
for
frame_size
in
frame_sizes
:
spectrogram
=
LogarithmicFilteredSpectrogram
(
audiofile
,
num_channels
=
num_channels
,
sample_rate
=
sample_rate
,
filterbank
=
LogarithmicFilterbank
,
frame_size
=
frame_size
,
fps
=
fps
,
num_bands
=
num_bands
,
fmin
=
fmin
,
fmax
=
fmax
,
norm_filters
=
norm_filters
,
start_silence
=
start_silence
)
if
diff
:
spectrogram_diff
=
SpectrogramDifference
(
spectrogram
,
diff_ratio
=
diff_ratio
,
positive_diffs
=
positive_diffs
,
stack_diffs
=
np
.
hstack
)
spectrogram
=
np
.
hstack
((
spectrogram
,
spectrogram_diff
))
spectrograms
.
append
(
spectrogram
)
return
np
.
hstack
(
spectrograms
)
\ No newline at end of file
piano_transcription/layers/__init__.py
0 → 100644
View file @
58580d56
piano_transcription/layers/gradient.py
0 → 100644
View file @
58580d56
import
theano
import
lasagne
from
lasagne.layers.base
import
Layer
__all__
=
[
"GradientClipLayer"
,
]
class
GradientClipLayer
(
Layer
):
"""
Apply gradient clipping to output layer
"""
def
__init__
(
self
,
incoming
,
grad_clipping
,
nonlinearity
=
lasagne
.
nonlinearities
.
identity
,
**
kwargs
):
super
(
GradientClipLayer
,
self
).
__init__
(
incoming
,
**
kwargs
)
self
.
grad_clipping
=
grad_clipping
self
.
nonlinearity
=
nonlinearity
def
get_output_for
(
self
,
input
,
**
kwargs
):
clipped
=
theano
.
gradient
.
grad_clip
(
input
,
-
self
.
grad_clipping
,
self
.
grad_clipping
)
return
self
.
nonlinearity
(
clipped
)
piano_transcription/models/__init__.py
0 → 100644
View file @
58580d56
piano_transcription/models/cnn_1.py
0 → 100644
View file @
58580d56
#!/usr/bin/env python
import
numpy
as
np
import
theano
import
lasagne
from
lasagne.layers.dnn
import
Conv2DDNNLayer
as
Conv2DLayer
from
lasagne.layers.dnn
import
batch_norm_dnn
as
batch_norm
from
lasagne.layers.dnn
import
MaxPool2DDNNLayer
as
MaxPool2DLayer
from
lasagne.layers
import
DropoutLayer
,
FlattenLayer
,
DenseLayer
from
piano_transcription.data.data_pools
import
BatchIterator
INI_LEARNING_RATE
=
np
.
float32
(
0.0005
)
BATCH_SIZE
=
100
MAX_EPOCHS
=
1000
PATIENCE
=
4
L2
=
0.0004
SPEC_BINS
=
168
OUT_LEN
=
88
SPEC_CONTEXT
=
25
STEP_SIZE
=
1
SEQ_LENGTH
=
1
CENTRAL_TARGET
=
True
init_conv
=
lasagne
.
init
.
HeNormal
MAX_PRED_SIZE
=
100
USE_BATCHED_PREDICT
=
True
dense_layers
=
2
dense_units
=
256
def
get_valid_batch_iterator
():
def
batch_iterator
(
batch_size
,
k_samples
,
shuffle
):
return
BatchIterator
(
batch_size
=
batch_size
,
prepare
=
prepare
,
k_samples
=
k_samples
,
shuffle
=
shuffle
)
return
batch_iterator
def
get_train_batch_iterator
():
def
batch_iterator
(
batch_size
,
k_samples
,
shuffle
):
return
BatchIterator
(
batch_size
=
batch_size
,
prepare
=
prepare_train
,
k_samples
=
k_samples
,
shuffle
=
shuffle
)
return
batch_iterator
def
predict
(
net
,
X
,
max_seq_len
,
out_len
):
seq_len
,
feat_len
=
X
.
shape
pad_width
=
(
SPEC_CONTEXT
-
1
)
/
2
x_b_p
=
np
.
pad
(
X
,
((
pad_width
,
pad_width
),
(
0
,
0
)),
'constant'
)
step_size
=
1
indices
=
np
.
arange
(
pad_width
,
x_b_p
.
shape
[
0
]
-
pad_width
,
step_size
).
astype
(
np
.
int
)
n_seq_pred
=
len
(
indices
)
shape
=
[
n_seq_pred
,
1
,
SPEC_CONTEXT
,
feat_len
]
X_pred
=
np
.
zeros
(
shape
,
dtype
=
theano
.
config
.
floatX
)
for
o_idx
,
x_idx
in
enumerate
(
indices
):
X_pred
[
o_idx
,
0
,
:,
:]
=
x_b_p
[
x_idx
-
pad_width
:
x_idx
+
pad_width
+
1
,
:]
if
USE_BATCHED_PREDICT
:
p_b
=
np
.
zeros
((
seq_len
,
out_len
))
n_batches
=
int
(
np
.
ceil
(
X_pred
.
shape
[
0
]
/
float
(
MAX_PRED_SIZE
)))
for
batch
in
xrange
(
n_batches
):
i0
=
batch
*
MAX_PRED_SIZE
i1
=
i0
+
MAX_PRED_SIZE
i1o
=
min
(
i1
,
seq_len
)
p_b
[
i0
:
i1o
]
=
net
.
predict_proba
(
X_pred
[
i0
:
i1
])[
0
:(
i1o
-
i0
)]
else
:
p_b
=
net
.
predict_proba
(
X_pred
)[:
seq_len
]
return
p_b
def
prepare
(
x
,
y
):
y
=
np
.
squeeze
(
y
)
return
x
,
y
def
prepare_train
(
x
,
y
):
x
,
y
=
prepare
(
x
,
y
)
return
x
,
y
def
build_eval_model
(
max_seq_len
,
feat_len
,
out_len
):
if
USE_BATCHED_PREDICT
:
return
build_model
(
batch_size
=
MAX_PRED_SIZE
,
seq_length
=
1
,
feat_len
=
feat_len
,
out_len
=
out_len
)
else
:
return
build_model
(
batch_size
=
max_seq_len
,
seq_length
=
1
,
feat_len
=
feat_len
,
out_len
=
out_len
)
def
build_model
(
batch_size
=
BATCH_SIZE
,
seq_length
=
None
,
feat_len
=
SPEC_BINS
,
out_len
=
OUT_LEN
):
""" Compile net architecture """
nonlin
=
lasagne
.
nonlinearities
.
rectify
# --- input layers ---
l_in
=
lasagne
.
layers
.
InputLayer
(
shape
=
(
batch_size
,
1
,
SPEC_CONTEXT
,
feat_len
))
# --- conv layers ---