Kerasでのmixup augmentation
mixupについて調べて実装したのでまとめました。
以下の実装や検証はgithubにあげています。
1. 本記事の概要
- data augmentationの1種であるmixupについて概要をまとめました。
- 2つ以上のサンプルを混合できるよう、mixupの拡張を検討しました。
- pythonでmixupを実装しました。
- ①numpy版
- ②kerasのSequence版
- ③kerasのimageDataAugmentationを組み合わせたSequence版
- ①numpy版
- mixupの効果を検証しました。今回の検証では効果見られず。。実装は間違ってなさそうだけど、効果がでない問題だったのか??
2. mixupの概要
ざっくりと。
mixupはdeep learningで使われるdata augmentationの1種。data augmentationとは、学習データを加工することでデータを水増しする処理のこと。例えば画像分類の問題であれば、画像を回転させたりずらしたりすることで新しいデータを作ってやる。mixupでは以下のようにサンプルを2つ選んで、そのサンプルを線形内挿することで新しいデータを作る。
:ランダムに選ばれたサンプル
:分布からサンプリングされ0~1の値をとる。
:Beta分布のパラメータ。
αの特徴
- 0に近いほど、は0か1に数値となる(Beta分布が鍋型)。
- 1に近いほど、は0~1の間のランダムな数値となる(Beta分布が一様分布)。
- 1より大きいほど、は0.5に近い数値となる(Beta分布が釣り鐘型)。
参考文献
- 原論文
[1710.09412] mixup: Beyond Empirical Risk Minimization
mixup: Beyond Empirical Risk Minimization | OpenReview - ブログ
新たなdata augmentation手法mixupを試してみた - Qiita
[Data Augmentation 第1回] mixup 事始め | 技ラボ
[Data Augmentation 第2回] mixup 少量データでの効果測定(画像偏) | 技ラボ
[Data Augmentation 第3回] mixup 少量データでの効果測定(センサーデータ偏) | 技ラボ
KerasのImageDataGeneratorを継承してMix-upやRandom Croppingのできる独自のジェネレーターを作る - Qiita
3. mixupの拡張
mixupでは2つのサンプルの線形内挿をとっている。以下のように2つ以上のサンプルを線形内挿できるように拡張する。
:ランダムに選ばれたサンプル
:分布からサンプリングされ0~1の値をとる。
:Dirichlet分布のパラメータ。
4.mixupの実装
numpy版
import numpy as np class MixupGenerator(): def __init__(self, x, y, batch_size=32, mix_num=2, alpha=0.2): self.x = x self.y = y self.batch_size = batch_size self.alpha = alpha self.mix_num = mix_num # self.__sample_num = len(self.x) self.__dirichlet_alpha = np.ones(self.mix_num) * self.alpha return def flow(self): while True: indexes = self.get_indexes() itr_num = int(np.ceil(self.__sample_num / self.batch_size)) for i in range(itr_num): batch_indxs = indexes[:, i*self.batch_size : (i+1)*self.batch_size] x, y = self.mixup(self.x[batch_idxs], self.y[batch_idxs]) yield x, y def mixup(self, batch_x, batch_y): ''' return mixuped_x, mixuped_y. batch_x = self.x[batch_idxs], batch_y = self.y[batch_idxs] batch_idxs = [ [idx(0), idx(1), ..., idx(batch_size)], # indexes of mixed no. 1 [idx(0), idx(1), ..., idx(batch_size)], # indexes of mixed no. 2 ..., [idx(0), idx(1), ..., idx(batch_size)] # indexes of mixed no. mix_num ]. idx(k)s of mixed no.1, 2, ..., mix_num are mixed. ''' mix_num = batch_x.shape[0] batch_size = batch_x.shape[1] #mixed_x[k,:,:,...] = batch_x[0,k,:,:,...] * mixup_rate[0,k] + batch_x[1,k,:,:,...] * mixup_rate[1,k] + ... + batch_x[mix_num,k,:,:,...] * mixup_rate[mix_num,k] #mixed_y[k,:,:,...] = batch_y[0,k,:,:,...] * mixup_rate[0,k] + batch_y[1,k,:,:,...] * mixup_rate[1,k] + ... + batch_y[mix_num,k,:,:,...] * mixup_rate[mix_num,k] mixup_rate = np.random.dirichlet(alpha=self.__dirichlet_alpha, size=(batch_size)) mixup_rate_tr = np.transpose(mixup_rate) reshapelist__mix_rate_tr_x = [mix_num, batch_size] + [1]*(len(batch_x.shape) - 2) reshapelist__mix_rate_tr_y = [mix_num, batch_size] + [1]*(len(batch_y.shape) - 2) mixup_rate_tr_x = np.reshape(mixup_rate_tr, reshapelist__mix_rate_tr_x) mixup_rate_tr_y = np.reshape(mixup_rate_tr, reshapelist__mix_rate_tr_y) # mixuped_x = np.sum(batch_x * mixup_rate_tr_x, axis=0) mixuped_y = np.sum(batch_y * mixup_rate_tr_y, axis=0) return mixuped_x, mixuped_y def get_indexes(self): ''' return indexes. indexes = [ [shuffled [0, 1,.., sample_num]], #indexes of mixed no. 1 [shuffled [0, 1,.., sample_num]], #indexes of mixed no. 2 ..., [shuffled [0, 1,.., sample_num]], #indexes of mixed no. mix_num ] ''' indexes = np.ones((self.mix_num, self.__sample_num), dtype='int') * np.arange(self.__sample_num) for i in range(self.mix_num): np.random.shuffle(indexes[i,:]) return indexes
kerasのSequence版、kerasのimageDataAugmentationを組み合わせたSequence版
imageDataAugmentationと同じようにkerasのfit_generatorを使えます。
import numpy as np from keras.utils import Sequence from keras.preprocessing.image import ImageDataGenerator import scipy.stats as scst class MixupSequence(Sequence): def __init__(self, x, y, batch_size=32, mix_num=2, alpha=0.2): self.x = x self.y = y self.batch_size = batch_size self.alpha = alpha self.mix_num = mix_num # self.__sample_num = len(self.x) self.__dirichlet_alpha = np.ones(self.mix_num) * self.alpha # self.__shuffuled_idxes = self.get_indexes() return def __len__(self): return int(np.ceil(len(self.x) / self.batch_size)) def __getitem__(self, idx): batch_x, batch_y = self.get_next_batch(idx) mixed_x, mixed_y = self.mixup(batch_x, batch_y) return mixed_x, mixed_y def on_epoch_end(self): self.__shuffuled_idxes = self.get_indexes() return def mixup(self, batch_x, batch_y): ''' return mixuped_x, mixuped_y. batch_x = self.x[batch_idxs], batch_y = self.y[batch_idxs] batch_idxs = [ [idx(0), idx(1), ..., idx(batch_size)], # indexes of mixed no. 1 [idx(0), idx(1), ..., idx(batch_size)], # indexes of mixed no. 2 ..., [idx(0), idx(1), ..., idx(batch_size)] # indexes of mixed no. mix_num ]. idx(k)s of mixed no.1, 2, ..., mix_num are mixed. ''' mix_num = batch_x.shape[0] batch_size = batch_x.shape[1] #mixed_x[k,:,:,...] = batch_x[0,k,:,:,...] * mixup_rate[0,k] + batch_x[1,k,:,:,...] * mixup_rate[1,k] + ... + batch_x[mix_num,k,:,:,...] * mixup_rate[mix_num,k] #mixed_y[k,:,:,...] = batch_y[0,k,:,:,...] * mixup_rate[0,k] + batch_y[1,k,:,:,...] * mixup_rate[1,k] + ... + batch_y[mix_num,k,:,:,...] * mixup_rate[mix_num,k] #np.random.dirichlet has error with small alpha. #mixup_rate = np.random.dirichlet(alpha=self.__dirichlet_alpha, size=(batch_size)) mixup_rate = scst.dirichlet.rvs(alpha=self.__dirichlet_alpha, size=batch_size) mixup_rate_tr = np.transpose(mixup_rate) reshapelist__mix_rate_tr_x = [mix_num, batch_size] + [1]*(len(batch_x.shape) - 2) reshapelist__mix_rate_tr_y = [mix_num, batch_size] + [1]*(len(batch_y.shape) - 2) mixup_rate_tr_x = np.reshape(mixup_rate_tr, reshapelist__mix_rate_tr_x) mixup_rate_tr_y = np.reshape(mixup_rate_tr, reshapelist__mix_rate_tr_y) # mixuped_x = np.sum(batch_x * mixup_rate_tr_x, axis=0) mixuped_y = np.sum(batch_y * mixup_rate_tr_y, axis=0) return mixuped_x, mixuped_y def get_indexes(self): ''' return indexes. indexes = [ [shuffled [0, 1,.., sample_num]], #indexes of mixed no. 1 [shuffled [0, 1,.., sample_num]], #indexes of mixed no. 2 ..., [shuffled [0, 1,.., sample_num]], #indexes of mixed no. mix_num ] ''' indexes = np.ones((self.mix_num, self.__sample_num), dtype='int') * np.arange(self.__sample_num) for i in range(self.mix_num): np.random.shuffle(indexes[i,:]) return indexes def get_next_batch(self, idx): batch_indxes = self.__shuffuled_idxes[:, idx*self.batch_size : (idx+1)*self.batch_size] return self.x[batch_indxes], self.y[batch_indxes] class ImageMixupSequence(MixupSequence): def __init__(self, x, y, keras_ImageDataGenelator, batch_size = 32, mix_num = 2, alpha = 0.2): super().__init__(x, y, batch_size, mix_num, alpha) # self.keras_img_genelator = keras_ImageDataGenelator self.keras_img_gens = [] for i in range(self.mix_num): self.keras_img_gens.append( self.keras_img_genelator.flow( x=self.x, y=self.y, batch_size=self.batch_size, shuffle=True, seed=None) ) # self.__shuffuled_idxes = None return def on_epoch_end(self): pass def get_indexes(self): pass def get_next_batch(self, idx): batch_x, batch_y = [], [] min_batch_size = -1 for k_img_gen in self.keras_img_gens: temp_x, temp_y = k_img_gen.__next__() batch_x.append(temp_x) batch_y.append(temp_y) # if min_batch_size == -1 or min_batch_size > temp_x.shape[0]: min_batch_size = temp_x.shape[0] #Align batch sizes. #Thre is a possibility that the batch sizes may differ in case of multiple thread processing. for i in range(len(batch_x)): batch_x[i] = (batch_x[i])[:min_batch_size] batch_y[i] = (batch_y[i])[:min_batch_size] batch_x, batch_y = np.array(batch_x), np.array(batch_y) return batch_x, batch_y
6.mixupの検証
kerasのresnetサンプルを参考にさせてもらい、ResNet20でmixupの効果を検証しました。
keras/cifar10_resnet.py at master · keras-team/keras · GitHub
全然効果があるように見えないのですが。。笑
原論文でも効果がない場合もあったので、この検証もそうだったのかもしれません。実装は問題なくできていそうなのでとりあえず良しとします。(間違ってたらコメントで教えてください。。)
画像以外のタスクでも使ってみたいなー。
7.ソースコード
上記の実装や検証はgithubにあげています。
GitHub - statsu1990/mixup_augmentation: implementation mixup data augmentation with numpy and keras