Spaces:
Sleeping
Sleeping
| import tensorflow as tf | |
| import numpy as np | |
| from tensorflow_hub import KerasLayer | |
| import os | |
| from keras.layers import Dense,Dropout,Input,BatchNormalization,Lambda | |
| from keras.models import Model | |
| from keras.optimizers import Adam | |
| from keras import Sequential | |
| from keras.callbacks import EarlyStopping,ModelCheckpoint | |
| from keras.layers.experimental.preprocessing import RandomRotation,RandomFlip,RandomCrop,PreprocessingLayer | |
| from tensorflow.math import l2_normalize | |
| from sys import argv | |
| User=argv[1] | |
| NegativeDatasetPath="./FaceRecognition/ExtactedFaces/Negative/" | |
| TrainUsersDatasetPath="./FaceRecognition/ExtactedFaces/Train/" | |
| TestUsersDatasetPath="./FaceRecognition/ExtactedFaces/Test/" | |
| DatasetPath="./FaceRecognition/ExtactedFaces/Dataset/" | |
| def DatasetPaths(UserDatapath,state,NegativeDatasetPath=None,DatasetPath=None): | |
| User=os.listdir(UserDatapath) | |
| UserFiles=[] | |
| UserLabels=[] | |
| for folder in User: | |
| for file in os.listdir(UserDatapath+folder): | |
| UserFiles.append(UserDatapath+folder+'/'+file) | |
| UserLabels.append(folder) | |
| if state==True: | |
| Negativefiles=[] | |
| NegativeLabels=[] | |
| DatasetPathfiles=[] | |
| for file in os.listdir(NegativeDatasetPath): | |
| Negativefiles.append(NegativeDatasetPath+file) | |
| NegativeLabels.append(file.split(",")[0]) | |
| for file in os.listdir(DatasetPath): | |
| DatasetPathfiles.append(DatasetPath+file) | |
| return np.array(Negativefiles),np.array(NegativeLabels),np.array(UserFiles),np.array(UserLabels),np.array(DatasetPathfiles) | |
| return np.array(UserFiles),np.array(UserLabels) | |
| Negativefiles,NegativeLabels,UserFiles,UserLabels,DatasetPathfiles=DatasetPaths(TrainUsersDatasetPath,True,NegativeDatasetPath,DatasetPath) | |
| Negativefiles,NegativeLabels,UserFiles,UserLabels,DatasetPathfiles | |
| TrainClasses=np.unique(UserLabels) | |
| TrainClassesCount=len(TrainClasses) | |
| TestUserFiles,TestUserLabels=DatasetPaths(UserDatapath=TestUsersDatasetPath,state=False) | |
| TestClasses=np.unique(TestUserLabels) | |
| TestClassesCount=len(TestClasses) | |
| mask=np.zeros(shape=(224,224,3)) | |
| mask[:,:,0]=200 | |
| mask[:,:,1]=100 | |
| mask[:,:,2]=200 | |
| mask=tf.cast(mask/255,tf.float32) | |
| FliPer=RandomFlip(mode="horizontal",) | |
| Rotater=RandomRotation([-0.135,0.135]) | |
| def PreProcessInput(Image,num): | |
| if num ==0: | |
| Image=FliPer(Image) | |
| elif num==1: | |
| Image= 0.75*Image+0.25*mask | |
| if num<=2: | |
| return Rotater(Image) | |
| else: | |
| return Image | |
| def load_image(Anchor,Positive,Nagative,State): | |
| Anchor=tf.io.read_file(Anchor) | |
| Anchor=tf.image.decode_jpeg(Anchor) | |
| Anchor = tf.cast(Anchor, tf.float32) | |
| Anchor = tf.image.resize(Anchor, [224,224], method = tf.image.ResizeMethod.NEAREST_NEIGHBOR) | |
| ranA=tf.random.uniform(shape=[1],minval=0,maxval=6,dtype=tf.int32) | |
| Positive=tf.io.read_file(Positive) | |
| Positive=tf.image.decode_jpeg(Positive) | |
| Positive = tf.cast(Positive, tf.float32) | |
| Positive = tf.image.resize(Positive, [224,224], method = tf.image.ResizeMethod.NEAREST_NEIGHBOR) | |
| ranB=tf.random.uniform(shape=[1],minval=0,maxval=6,dtype=tf.int32) | |
| Negative=tf.io.read_file(Nagative) | |
| Negative=tf.image.decode_jpeg(Negative) | |
| Negative = tf.cast(Negative, tf.float32) | |
| Negative = tf.image.resize(Negative, [224,224], method = tf.image.ResizeMethod.NEAREST_NEIGHBOR) | |
| ranN=tf.random.uniform(shape=[1],minval=0,maxval=6,dtype=tf.int32) | |
| if State: | |
| Anchor=PreProcessInput(Anchor/255,ranA) | |
| Positive=PreProcessInput(Positive/255,ranB) | |
| Negative=PreProcessInput(Negative/255,ranN) | |
| else: | |
| Anchor=Anchor/255 | |
| Positive=Positive/255 | |
| Negative=Negative/255 | |
| return (Anchor,Positive,Negative) | |
| def DatasetTripletsGenerator(State): | |
| # Negativefiles=Negativefiles | |
| # NegativeLabels=NegativeLabels | |
| # DatasetPathfiles=DatasetPathfiles | |
| if State: | |
| UsersImagesPath=UserFiles | |
| UsersImagesLabel=UserLabels | |
| ClassesCount=TrainClassesCount | |
| Classes=TrainClasses | |
| else: | |
| ImagesName=TestUserFiles | |
| ImagesLabel=TestUserLabels | |
| ClassesCount=TestClassesCount | |
| Classes=TestClasses | |
| for i in range(ClassesCount): | |
| class_=Classes[i] | |
| files=UsersImagesPath[UsersImagesLabel==class_] | |
| files_num=len(files) | |
| for index in range(files_num-1): | |
| for j in range(index+1,files_num): | |
| ancore=files[index] | |
| positive=files[j] | |
| random=np.random.randint(0,high=10) | |
| negative=None | |
| if random<=3: | |
| negative=Negativefiles[NegativeLabels==class_] | |
| if type(negative)==list: | |
| negative=np.random.choice(negative) | |
| elif random<=7: | |
| negative=UsersImagesPath[UsersImagesLabel != class_] | |
| if type(negative)==list: | |
| negative=np.random.choice(negative) | |
| elif random<=10: | |
| negative=DatasetPathfiles | |
| if type(negative)==list: | |
| negative=np.random.choice(negative) | |
| if type(negative)!=str: | |
| negative=np.random.choice(DatasetPathfiles) | |
| yield ancore,positive,negative,State | |
| def EmbeddingImageLoader(Anchor,Label): | |
| Anchor=tf.io.read_file(Anchor) | |
| Anchor=tf.image.decode_jpeg(Anchor) | |
| Anchor = tf.cast(Anchor, tf.float32) | |
| Anchor = tf.image.resize(Anchor, [224,224], method = tf.image.ResizeMethod.NEAREST_NEIGHBOR) | |
| Anchor=Anchor/255 | |
| return (Anchor,Label) | |
| TrainData=tf.data.Dataset.from_generator(DatasetTripletsGenerator,args=[True],output_types=(tf.string,tf.string,tf.string,tf.bool),output_shapes=((),(),(),()),name="DataLoaderPipeline") | |
| TrainData=TrainData.map(load_image) | |
| TrainData=TrainData.batch(2).shuffle(buffer_size=10) | |
| TestData=tf.data.Dataset.from_generator(DatasetTripletsGenerator,args=[False],output_types=(tf.string,tf.string,tf.string,tf.bool),output_shapes=((),(),(),()),name="DataLoaderPipeline") | |
| TestData=TestData.map(load_image).batch(2) | |
| EmbeddingData=tf.data.Dataset.from_tensor_slices((list(UserFiles),list(UserLabels))).map(EmbeddingImageLoader).batch(1) | |
| class DistanceLayer(tf.keras.layers.Layer): | |
| def __init__(self): | |
| super().__init__() | |
| def call(self,anchor,positive,negative): | |
| dis_ap=tf.reduce_sum(tf.square(anchor - positive), 1) ## distance between anchor and positive | |
| dis_an=tf.reduce_sum(tf.square(anchor - negative), 1) ## distance between anchor and negative | |
| return dis_ap , dis_an | |
| def GetEncoder(): | |
| # /drive/MyDrive/Model/ | |
| if os.path.isdir("./FaceRecognition/FaceModel/keras/"): | |
| return tf.keras.models.load_model("./FaceRecognition/FaceModel/") | |
| else: | |
| pretrained_model = KerasLayer("./prtrained/archive/",trainable=False) ##pretraind Model | |
| encode_model = Sequential([ | |
| pretrained_model, | |
| Dropout(0.2), | |
| Dense(512, activation='relu'), | |
| BatchNormalization(), | |
| Dense(128, activation="relu"), | |
| Lambda(lambda x:l2_normalize(x)) | |
| ], name="Encoder") | |
| return encode_model | |
| def SiameseNetwork(inputshape=(224,224,3)): | |
| An_input=Input(shape=inputshape) | |
| Po_input=Input(shape=inputshape) | |
| Ne_input=Input(shape=inputshape) | |
| encoder=GetEncoder() | |
| An_embeding=encoder(An_input) | |
| Po_embeding=encoder(Po_input) | |
| Ne_embeding=encoder(Ne_input) | |
| distanc=DistanceLayer()(An_embeding,Po_embeding,Ne_embeding) #return distance between (A and B) and (A and N) | |
| return Model(inputs=[An_input,Po_input,Ne_input],outputs=distanc) | |
| siames_net=SiameseNetwork() | |
| class SiamesModel(Model): | |
| def __init__(self,siames_net,DesiredDistance): | |
| super(SiamesModel, self).__init__() | |
| self.Model=siames_net | |
| self.DesiredDistance=DesiredDistance | |
| self.LossTracker=tf.keras.metrics.Mean(name="Loss") | |
| self.VALTracker=tf.keras.metrics.Mean(name="VAL") | |
| self.PmeanTracker=tf.keras.metrics.Mean(name="P_mean") | |
| self.PmaxTracker=tf.keras.metrics.Mean(name="P_max") | |
| self.PstdTracker=tf.keras.metrics.Mean(name="P_std") | |
| self.FARTracker=tf.keras.metrics.Mean(name="FAR") | |
| self.N_meanTracker=tf.keras.metrics.Mean(name="N_mean") | |
| self.NstdTracker=tf.keras.metrics.Mean(name="N_std") | |
| self.NminTracker=tf.keras.metrics.Mean(name="N_min") | |
| def call(self,data): | |
| return self.Model(data) | |
| def train_step(self,data): | |
| with tf.GradientTape() as Tape: | |
| AP_distanc,AN_distance=self.Model(data) | |
| loss=self.TripLoss(AP_distanc,AN_distance) | |
| gradients=Tape.gradient(loss,self.Model.trainable_weights) | |
| self.optimizer.apply_gradients(zip(gradients, self.Model.trainable_weights)) | |
| self.DistanceEval(AP_distanc,AN_distance) | |
| self.LossTracker.update_state(loss) | |
| return {"VAL":self.VALTracker.result(), | |
| "P_mean":self.PmeanTracker.result(), | |
| "P_max":self.PmaxTracker.result(), | |
| "P_std":self.PstdTracker.result(), | |
| "FAR":self.FARTracker.result(), | |
| "N_mean":self.N_meanTracker.result(), | |
| "N_min":self.NminTracker.result(), | |
| "N_std":self.NstdTracker.result(), | |
| "Loss":self.LossTracker.result()} | |
| def test_step(self, data): | |
| AP_distanc,AN_distance=self.Model(data) | |
| loss=self.TripLoss(AP_distanc,AN_distance) | |
| self.LossTracker.update_state(loss) | |
| self.DistanceEval(AP_distanc,AN_distance) | |
| return {"VAL":self.VALTracker.result(), | |
| "P_mean":self.PmeanTracker.result(), | |
| "P_max":self.PmaxTracker.result(), | |
| "P_std":self.PstdTracker.result(), | |
| "FAR":self.FARTracker.result(), | |
| "N_mean":self.N_meanTracker.result(), | |
| "N_min":self.NminTracker.result(), | |
| "N_std":self.NstdTracker.result(), | |
| "Loss":self.LossTracker.result()} | |
| def TripLoss(self,ap_distance,an_distance): | |
| return tf.reduce_mean(tf.maximum(ap_distance-0.2*self.DesiredDistance,0)+tf.maximum(self.DesiredDistance-an_distance, 0.0)) | |
| def metrics(self): | |
| return [self.LossTracker,self.VALTracker,self.PmaxTracker,self.PmeanTracker,self.PstdTracker,self.FARTracker,self.N_meanTracker,self.NminTracker,self.NstdTracker] | |
| def DistanceEval(self,P_distance,N_distance): | |
| P_pred,N_pred=self.TDEvaluation(P_distance,N_distance) | |
| PCDCount=tf.size(tf.where(P_pred)) | |
| VAL=PCDCount/tf.size(P_pred) | |
| self.VALTracker.update_state(VAL) | |
| NCDcount=tf.size(tf.where(N_pred)) | |
| FAR=1-(NCDcount/tf.size(P_pred)) | |
| self.FARTracker.update_state(FAR) | |
| P_mean=tf.reduce_mean(P_distance) | |
| self.PmeanTracker.update_state(P_mean) | |
| N_mean=tf.reduce_mean(N_distance) | |
| self.N_meanTracker.update_state(N_mean) | |
| P_std=tf.math.reduce_std(P_distance) | |
| self.PstdTracker.update_state(P_std) | |
| N_std=tf.math.reduce_std(N_distance) | |
| self.NstdTracker.update_state(N_std) | |
| P_max=tf.reduce_max(P_distance) | |
| self.PmaxTracker.update_state(P_max) | |
| N_min=tf.reduce_min(N_distance) | |
| self.NminTracker.update_state(N_min) | |
| def TDEvaluation(self,P_distance,N_distance): | |
| return tf.cast(P_distance<=self.DesiredDistance,dtype=tf.int8),tf.cast(N_distance>self.DesiredDistance,dtype=tf.int8) | |
| DesiredDistance=1 | |
| Optimizer= Adam(learning_rate=1e-4) | |
| Siamesmodel=SiamesModel(siames_net,DesiredDistance) | |
| Siamesmodel.compile(optimizer=Adam(1e-4),weighted_metrics=[]) | |
| Siamesmodel.fit(TrainData,validation_data=TestData,epochs=1,callbacks=[EarlyStopping(patience=3),ModelCheckpoint(f"./FaceRecognition/FaceModel/{User}/kerasModel")]) | |
| def EmbeddingMaker(DataPipline,Model): | |
| Embedding={} | |
| NamesTimer={} | |
| for Image,Name in DataPipline: | |
| Name=str(Name[0].numpy())[2:-1] | |
| if Name[0] not in Embedding.keys(): | |
| NamesTimer[Name]=1 | |
| Embedding[Name]=tf.squeeze(Model(Image)).numpy() | |
| else: | |
| Embedding[Name]=Embedding[Name]+tf.squeeze(Model(Image)).numpy() | |
| NamesTimer[Name]=NamesTimer[Name]+1 | |
| for Name in Embedding: | |
| Embedding[Name]=Embedding[Name]/NamesTimer[Name] | |
| return Embedding | |
| Embedding=EmbeddingMaker(EmbeddingData,siames_net.layers[3]) | |
| EmbeddingLabel,EmbeddingNames=[[Embedding[Name] for Name in Embedding] , {Name:Index+1 for Index,Name in enumerate(Embedding) } ] | |
| class LiteModel(tf.Module): | |
| def __init__(self,FaceModel,FacesEmbedding,name="FaceLiteModel"): | |
| self.FaceModel=FaceModel | |
| self.FacesEmdedding=FacesEmbedding | |
| def __call__(self,Image,Threshold): | |
| Embedding=self.FaceModel(Image) | |
| Distance=tf.cast(Threshold,tf.float32) | |
| Name=0 | |
| for Index,StoredEmbedding in enumerate(self.FacesEmdedding): | |
| distance=tf.reduce_sum(tf.math.pow(Embedding-StoredEmbedding,2)) | |
| if distance<Distance: | |
| Name=Index+1 | |
| Distance=distance | |
| return Name,Distance | |
| litemodel=LiteModel(siames_net.layers[3],FacesEmbedding=EmbeddingLabel) | |
| converter=tf.lite.TFLiteConverter.from_concrete_functions([litemodel.__call__.get_concrete_function()],litemodel) | |
| converter.optimizations=[tf.lite.Optimize.DEFAULT] | |
| converter.target_spec.supported_types=[tf.float16] | |
| tflitemodel=converter.convert() | |
| with open(f"./FaceRecognition/FaceModel/{User}/FaceXModel.tflite","wb") as file: | |
| file.write(tflitemodel) |