내용 |
import tensorflow as tf
import numpy as np
file_path = '/root/data/EleRhi/image_data_eleph_rhino.csv'
print(file_path, 'load ...')
file = np.loadtxt(file_path, delimiter=',')
np.random.shuffle(file)
file[:, : 75*75] = file[:, : 75*75] / 255
print('load complete ...')
entire_dat_num = file.shape[0]
dat_num = int(entire_dat_num * 0.8)
train_dat = file[:dat_num, :]
test_dat = file[dat_num:, :]
print(train_dat.shape, test_dat.shape)
# # load data
# filename_queue = tf.train.string_input_producer(['/root/data/EleRhi/image_data_eleph_rhino.csv'])
# reader = tf.TextLineReader()
# key, value = reader.read(filename_queue)
# record_defaults = [[0.> * (75*75+1)
# input_dat = tf.decode_csv(value, record_defaults=record_defaults)
# train_x_batch, train_y_batch = tf.train.batch([input_dat[:-1], input_dat[-1>, batch_size=100)
# function
def weight_variable(shape):
initial = tf.truncated_normal(shape, stddev=.1)
return tf.Variable(initial)
def bias_variable(shape):
initial = tf.constant(.1, shape=shape)
return tf.Variable(initial)
def conv2d(x, W):
return tf.nn.conv2d(x, W, strides=[1, 1, 1, 1], padding='SAME')
def max_pool_2x2(x):
return tf.nn.max_pool(x, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME')
def p_ReLU(x):
alpha = tf.Variable(1e-3, dtype=tf.float32)
temp = tf.nn.relu(x) * (1. + alpha)
subt = tf.abs(x) * alpha
return temp - subt
# cnn class
class simple_cnn:
def __init__(self, lr, iter, name, sess):
self.lr = lr
self.iter = iter
self.name = name
self.sess = sess
self.create_graph()
def create_graph(self):
#with tf.variable_scope(self.name):
# layer size
with tf.variable_scope(self.name, reuse=False):
c1, c2, c3, c4, a1, a2, a3 = 32, 64, 128, 128, 512, 512, 2
# input placeholder
self.x = tf.placeholder(tf.float32, [None, 75*75])
self.y_ = tf.placeholder(tf.int64, [None])
self.keep_prob = tf.placeholder(tf.float32)
x_image = tf.reshape(self.x, [-1, 75, 75, 1])
x_image = tf.pad(x_image, [[0, 0], [0, 1], [0, 1], [0, 0>)
# conv layers weights, biases - xavier
W_conv1_1 = tf.get_variable('W_conv1-1', shape=[3, 1, 1, c1],
initializer=tf.contrib.layers.xavier_initializer_conv2d()) * tf.sqrt(2.)
W_conv2_1 = tf.get_variable('W_conv2-1', shape=[3, 1, c1, c2],
initializer=tf.contrib.layers.xavier_initializer_conv2d()) * tf.sqrt(2.)
W_conv3_1 = tf.get_variable('W_conv3-1', shape=[3, 1, c2, c3],
initializer=tf.contrib.layers.xavier_initializer_conv2d()) * tf.sqrt(2.)
W_conv4_1 = tf.get_variable('W_conv4-1', shape=[3, 1, c3, c4],
initializer=tf.contrib.layers.xavier_initializer_conv2d()) * tf.sqrt(2.)
# W_conv1_2 = tf.get_variable('W_conv1-2', shape=[1, 3, c1, c1],
# initializer=tf.contrib.layers.xavier_initializer_conv2d()) * tf.sqrt(2.)
# W_conv2_2 = tf.get_variable('W_conv2-2', shape=[1, 3, c2, c2],
# initializer=tf.contrib.layers.xavier_initializer_conv2d()) * tf.sqrt(2.)
# W_conv3_2 = tf.get_variable('W_conv3-2', shape=[1, 3, c3, c3],
# initializer=tf.contrib.layers.xavier_initializer_conv2d()) * tf.sqrt(2.)
# W_conv4_2 = tf.get_variable('W_conv4-2', shape=[1, 3, c4, c4],
# initializer=tf.contrib.layers.xavier_initializer_conv2d()) * tf.sqrt(2.)
# b_conv1 = bias_variable([c1])
# b_conv2 = bias_variable([c2])
# b_conv3 = bias_variable([c3])
# b_conv4 = bias_variable([c4])
# conv layer 1, 2, 3, 4
# h_conv1 = p_ReLU(conv2d(x_image, W_conv1) + b_conv1)
# h_pool1 = max_pool_2x2(h_conv1)
# h_conv2 = p_ReLU(conv2d(h_pool1, W_conv2) + b_conv2)
# h_pool2 = max_pool_2x2(h_conv2)
# h_conv3 = p_ReLU(conv2d(h_pool2, W_conv3) + b_conv3)
# h_pool3 = max_pool_2x2(h_conv3)
# h_conv4 = p_ReLU(conv2d(h_pool3, W_conv4) + b_conv4)
# h_pool4 = max_pool_2x2(h_conv4)
h_conv1_1 = p_ReLU(conv2d(x_image, W_conv1_1))
# h_conv1_2 = p_ReLU(conv2d(h_conv1_1, W_conv1_2))
h_pool1 = max_pool_2x2(h_conv1_1)
h_conv2_1 = p_ReLU(conv2d(h_pool1, W_conv2_1))
# h_conv2_2 = p_ReLU(conv2d(h_conv2_1, W_conv2_2))
h_pool2 = max_pool_2x2(h_conv2_1)
h_conv3_1 = p_ReLU(conv2d(h_pool2, W_conv3_1))
# h_conv3_2 = p_ReLU(conv2d(h_conv3_1, W_conv3_2))
h_pool3 = max_pool_2x2(h_conv3_1)
h_conv4_1 = p_ReLU(conv2d(h_pool3, W_conv4_1))
# h_conv4_2 = p_ReLU(conv2d(h_conv4_1, W_conv4_2))
h_pool4 = max_pool_2x2(h_conv4_1)
# affine layers weights, biases
W_fc1 = tf.get_variable('W_fc1', shape=[5*5*c4, a1],
initializer=tf.contrib.layers.xavier_initializer()) * tf.sqrt(2.)
W_fc2 = tf.get_variable('W_fc2', shape=[a1, a2],
initializer=tf.contrib.layers.xavier_initializer()) * tf.sqrt(2.)
W_fc3 = tf.get_variable('W_fc3', shape=[a2, a3],
initializer=tf.contrib.layers.xavier_initializer()) * tf.sqrt(2.)
b_fc1 = bias_variable([a1])
b_fc2 = bias_variable([a2])
b_fc3 = bias_variable([a3])
# affine layer 1, 2, 3
h_pool4_flat = tf.reshape(h_pool4, [-1, 5*5*c4])
drop1 = tf.nn.dropout(h_pool4_flat, self.keep_prob) # dropout
h_fc1 = p_ReLU(tf.matmul(drop1, W_fc1) + b_fc1)
drop2 = tf.nn.dropout(h_fc1, self.keep_prob) # dropout
h_fc2 = p_ReLU(tf.matmul(drop2, W_fc2) + b_fc2)
# drop3 = tf.nn.dropout(h_fc2, self.keep_prob) # dropout
self.y = tf.matmul(h_fc2, W_fc3) + b_fc3
# cost, accuracy, etc
self.cross_entropy = tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(labels=self.y_, logits=self.y))
self.train_step = tf.train.AdamOptimizer(self.lr).minimize(self.cross_entropy)
correct_prediction = tf.equal(tf.argmax(self.y, 1), self.y_)
self.accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
self.confusion_mat = tf.contrib.metrics.confusion_matrix(labels=self.y_, predictions=tf.argmax(self.y, 1)
,num_classes=2, dtype='int32')
# config = tf.ConfigProto()
# config.gpu_options.per_process_gpu_memory_fraction = 0.8
# self.sess = tf.Session(config=config)
# self.sess.run(tf.global_variables_initializer())
def train(self, train_dat):
print(train_dat.shape)
for i in range(self.iter):
batch_mask = np.random.choice(train_dat.shape[0], 100)
batch = train_dat[batch_mask]
if i % 100 == 0:
train_accuracy, cost = self.sess.run([self.accuracy, self.cross_entropy], feed_dict={self.x: batch[:, :-1], self.y_: batch[:, -1], self.keep_prob: 1.})
print('step %d, batch accuracy %g' % (i, train_accuracy), '| cost %g' % cost)
self.sess.run(self.train_step, feed_dict={self.x: batch[:, :-1], self.y_: batch[:, -1], self.keep_prob: .5})
def test(self, test_dat):
acc_lst = []
confusion_mat = np.zeros((2, 2))
print(test_dat.shape)
for i in range(0, 3600, 100):
batch = test_dat[i: i+100]
acc, cm = self.sess.run([self.accuracy, self.confusion_mat],
feed_dict={self.x: batch[:, :-1], self.y_: batch[:, -1], self.keep_prob: 1.})
acc_lst.append(acc)
confusion_mat = tf.add(cm, confusion_mat)
print(sum(acc_lst)/ len(acc_lst))
print(self.sess.run(confusion_mat))
def en_test(self, test_dat):
lst = []
for i in range(0, 3600, 100):
batch = test_dat[i: i+100]
prediction = self.sess.run(self.y, feed_dict={self.x: batch[:, :-1], self.keep_prob: 1.})
lst += list(np.argmax(prediction, axis=1))
return lst
# cnn1 = simple_cnn(lr=5e-4, iter=10000, name='cnn1')
tf.reset_default_graph() # session 초기화 - 이전에 세션에 만들어졌던 그래프들을 모두 지운다
sess = tf.Session() # 세션 다시 선언
model_lst = []
for i in range(5):
model_lst.append(simple_cnn(sess=sess, lr=5e-4, iter=10000, name='cnn'+str(i)))
print(sess)
sess.run(tf.global_variables_initializer())
for i in model_lst:
i.train(train_dat)
result_lst = np.zeros(3600*2).reshape(-1, 2)
for i in model_lst:
temp_lst = i.en_test(test_dat)
for order, val in enumerate(temp_lst):
result_lst[order, val] += 1
result_ph = tf.placeholder('float', [None, 2])
label_ph = tf.placeholder(tf.int64, [None])
correct_prediction = tf.equal(tf.argmax(result_ph, 1), label_ph)
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
print(sess.run(accuracy, feed_dict={result_ph:result_lst, label_ph:test_dat[:3600, -1]})) |