| import tensorflow as tf |
| from tensorflow.keras import layers, Model |
| import numpy as np |
|
|
| class DeCorrelationLoss(tf.keras.layers.Layer): |
| """๋
ผ๋ฌธ์ ์ ํํ DeCov ์ ๊ทํ ๊ตฌํ""" |
| |
| def __init__(self, lambda_decov=1e-4, **kwargs): |
| super(DeCorrelationLoss, self).__init__(**kwargs) |
| self.lambda_decov = lambda_decov |
| |
| def build(self, input_shape): |
| super(DeCorrelationLoss, self).build(input_shape) |
| |
| def call(self, inputs): |
| batch_size = tf.cast(tf.shape(inputs)[0], tf.float32) |
| |
| |
| inputs_centered = inputs - tf.reduce_mean(inputs, axis=0, keepdims=True) |
| |
| |
| covariance = tf.matmul(inputs_centered, inputs_centered, transpose_a=True) / (batch_size - 1) |
| |
| |
| covariance_off_diagonal = covariance - tf.linalg.diag(tf.linalg.diag_part(covariance)) |
| |
| |
| decov_loss = 0.5 * tf.reduce_sum(tf.square(covariance_off_diagonal)) |
| |
| self.add_loss(self.lambda_decov * decov_loss) |
| return inputs |
|
|
| class MalConv(Model): |
| """๋
ผ๋ฌธ ์ ํ ์ฌ์ MalConv ๋ชจ๋ธ""" |
| |
| def __init__(self, |
| max_input_length=2_000_000, |
| embedding_size=8, |
| filter_size=500, |
| stride=500, |
| num_filters=128, |
| fc_size=128, |
| use_decov=True, |
| lambda_decov=1e-4, |
| **kwargs): |
| super(MalConv, self).__init__(**kwargs) |
| |
| self.max_input_length = max_input_length |
| self.use_decov = use_decov |
| |
| |
| self.embedding = layers.Embedding( |
| input_dim=256, |
| output_dim=embedding_size, |
| input_length=None, |
| mask_zero=False, |
| name='byte_embedding' |
| ) |
| |
| |
| self.conv_A = layers.Conv1D( |
| filters=num_filters, |
| kernel_size=filter_size, |
| strides=stride, |
| padding='valid', |
| activation='relu', |
| name='conv_A' |
| ) |
| |
| self.conv_B = layers.Conv1D( |
| filters=num_filters, |
| kernel_size=filter_size, |
| strides=stride, |
| padding='valid', |
| activation='sigmoid', |
| name='conv_B' |
| ) |
| |
| |
| self.global_max_pool = layers.GlobalMaxPooling1D(name='global_max_pool') |
| |
| |
| self.fc = layers.Dense(fc_size, activation='relu', name='fc_layer') |
| |
| |
| if use_decov: |
| self.decov_layer = DeCorrelationLoss(lambda_decov=lambda_decov) |
| |
| self.dropout = layers.Dropout(0.5, name='dropout') |
| self.output_layer = layers.Dense(1, activation='sigmoid', name='output') |
| |
| def call(self, inputs, training=None): |
| |
| x = self.embedding(inputs) |
| |
| |
| conv_a = self.conv_A(x) |
| conv_b = self.conv_B(x) |
| gated_conv = layers.multiply([conv_a, conv_b], name='gated_conv') |
| |
| |
| pooled = self.global_max_pool(gated_conv) |
| |
| |
| fc_out = self.fc(pooled) |
| |
| |
| if self.use_decov: |
| fc_out = self.decov_layer(fc_out) |
| |
| |
| if training: |
| fc_out = self.dropout(fc_out, training=training) |
| |
| |
| output = self.output_layer(fc_out) |
| |
| return output |
|
|
| def create_malconv_model (max_input_length=2_000_000): |
| """๋
ผ๋ฌธ ์์ ๋์ผ ์ฌ์ ๋ชจ๋ธ""" |
| model = MalConv(max_input_length=max_input_length) |
| |
| |
| initial_lr = 0.01 |
| lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay( |
| initial_learning_rate=initial_lr, |
| decay_steps=1000, |
| decay_rate=0.96, |
| staircase=True |
| ) |
| |
| optimizer = tf.keras.optimizers.SGD( |
| learning_rate=lr_schedule, |
| momentum=0.9, |
| nesterov=True |
| ) |
| |
| model.compile( |
| optimizer=optimizer, |
| loss='binary_crossentropy', |
| metrics=['accuracy', |
| tf.keras.metrics.Precision(name='precision'), |
| tf.keras.metrics.Recall(name='recall'), |
| tf.keras.metrics.AUC(name='auc')] |
| ) |
| |
| return model |
|
|