Spaces:
Sleeping
Sleeping
| import numpy as np | |
| import pandas as pd | |
| from tensorflow.data import Dataset | |
| from tensorflow import lite,float16,random,where,cast,float32,constant,reshape | |
| from sklearn.model_selection import train_test_split | |
| from os.path import isdir | |
| import os | |
| from keras.callbacks import EarlyStopping,ModelCheckpoint | |
| from keras.models import Model,load_model | |
| from keras.layers import Layer,Dense,Dropout,Input,Embedding,Concatenate | |
| from keras.optimizers import Adam | |
| from keras.losses import mean_absolute_error | |
| from keras.metrics import R2Score | |
| import sys | |
| User=sys.argv[1] | |
| print(sys.argv[0]) | |
| Data=pd.read_csv(f"./IndoorLocalization/Data/{User}/Data.csv") | |
| print(Data) | |
| PhonsId=np.sort(Data["PHONEID"].unique()) | |
| phoneidMap={phoneid:i for i,phoneid in enumerate(PhonsId)} | |
| def ReplaceId(id): | |
| if (np.random.randint(low=0,high=20,size=1)==3): | |
| return len(phoneidMap) | |
| return phoneidMap[id] | |
| Data["PHONEID"]=Data["PHONEID"].apply(ReplaceId) | |
| Data=Data.dropna() | |
| def CleanTrainData(df): | |
| target=df[["SPACEID","LONGITUDE","LATITUDE"]] | |
| df=df.drop(['LONGITUDE', 'LATITUDE',"SPACEID"], axis=1) | |
| return df, target | |
| DataX,TargetY=CleanTrainData(Data) | |
| LONGITUDEMax=TargetY["LONGITUDE"].max() | |
| LATITUDEMax=TargetY["LATITUDE"].max() | |
| LONGITUDEMin=TargetY["LONGITUDE"].min() | |
| LATITUDEMin=TargetY["LATITUDE"].min() | |
| BuildingWidth=20 | |
| BuildingLength=20 | |
| def LONGITUDE_min_max_newrange(item): | |
| return ((item-LONGITUDEMin)/(LONGITUDEMax-LONGITUDEMin))*BuildingWidth | |
| def LATITUDE_min_max_newrange(item): | |
| return ((item-LATITUDEMin)/(LATITUDEMax-LATITUDEMin))*BuildingLength | |
| TargetY["LONGITUDE"]=TargetY["LONGITUDE"].apply(LONGITUDE_min_max_newrange) | |
| TargetY["LATITUDE"]=TargetY["LATITUDE"].apply(LATITUDE_min_max_newrange) | |
| X_train, X_test, y_train, y_test = train_test_split(DataX.values,TargetY.values[:,1:], test_size=0.2, random_state=42,shuffle=True,stratify=TargetY.values[:,0]) | |
| SPACESGroups=TargetY.groupby("SPACEID") | |
| SPACESGroupsmean=SPACESGroups.mean() | |
| SPACEIDPosition={f"{SPACEID}":(SPACESGroupsmean.query(f"SPACEID=={SPACEID}")["LONGITUDE"].values[0],SPACESGroupsmean.query(f"SPACEID=={SPACEID}")["LATITUDE"].values[0]) for SPACEID in list(SPACESGroups.groups.keys()) } | |
| SPACEIDPositionArray=np.array([list(SPACEIDPosition[f"{i}"]) for i in SPACESGroups.groups.keys()]) | |
| PlacesNumber=len(np.unique(TargetY.iloc[:,0])) | |
| PhonesNumber=np.unique(DataX["PHONEID"]).size | |
| def ApplyNormalizationthenNois(X,Phoneid,Y): | |
| X=cast(X,dtype=float32) | |
| Y=cast(Y,dtype=float32) | |
| additem=np.random.choice([0,1,2]) | |
| Nuknow=np.random.randint(0,high=5,size=1) | |
| X=(X+100)/200 | |
| if additem ==1: | |
| if Nuknow==0: | |
| return (X,0),Y | |
| return (X,Phoneid),Y | |
| else: | |
| noise=random.normal(shape=X.shape,mean=0,stddev=0.1,dtype=float32) | |
| NoisedX=X+noise | |
| NoisedX=where(NoisedX<0,x=0.0,y=NoisedX) | |
| NoisedX=where(NoisedX>1,x=1.0,y=NoisedX) | |
| if Nuknow==0: | |
| return (NoisedX,0),Y | |
| return (NoisedX,Phoneid),Y | |
| def ApplyNormalizationOnly(X,Phoneid,Y): | |
| X=cast(X,dtype=float32) | |
| Y=cast(Y,dtype=float32) | |
| X=(X+100)/200 | |
| if Phoneid ==1: | |
| return (X,1),Y | |
| elif Phoneid ==2: | |
| return (X,2),Y | |
| else: | |
| return (X,0),Y | |
| TrainDataPipeline=Dataset.from_tensor_slices((X_train[:,:-1],X_train[:,-1],y_train)).map(ApplyNormalizationthenNois).batch(100) | |
| TestDataPipeline=Dataset.from_tensor_slices((X_test[:,:-1],X_test[:,-1],y_test)).map(ApplyNormalizationOnly).batch(10) | |
| class PositionAproxmator(Layer): | |
| def __init__(self,PlacesPosition,name="PositionAproxmator"): | |
| super(PositionAproxmator,self).__init__() | |
| self.PlacesPosition=constant(PlacesPosition,dtype=float32,name="PlacesPositions") | |
| def build(self,inputs_shape): | |
| self.W=self.add_weight(shape=(inputs_shape[1],2),trainable=True,dtype=float32,name="PlacesWeight") | |
| def call(self,Probilites): | |
| return Probilites@(self.PlacesPosition+self.W) | |
| def MakeModel(SPACEIDPosition,PhonesNumber): | |
| if isdir(f"./IndoorLocalization/IndoorModels/{User}/kerasModel"): | |
| return load_model(f"./IndoorLocalization/IndoorModels/{User}/kerasModel") | |
| WiFiReadings=Input(168) | |
| Phoneid=Input(1) | |
| Embeding=Embedding(PhonesNumber,64, embeddings_regularizer="l2")(Phoneid) | |
| X=Dense(128,activation="relu")(WiFiReadings) | |
| X=Dropout(0.2)(X) | |
| z=Dense(64,activation="relu")(X) | |
| X=z+reshape(Embeding,shape=(-1,64)) | |
| X=Concatenate()([z,X,reshape(Embeding,shape=(-1,64))]) | |
| X=Dropout(0.2)(X) | |
| X=Dense(100,activation="relu", kernel_regularizer="l2")(X) | |
| X=Dropout(0.1)(X) | |
| X=Dense(64,activation="relu")(X) | |
| X=X+z | |
| X=Dense(64,activation="relu")(X) | |
| X=Dropout(0.2)(X) | |
| X=Dense(PlacesNumber,activation="softmax")(X) | |
| X=PositionAproxmator(SPACEIDPosition)(X) | |
| return Model(inputs=[WiFiReadings,Phoneid],outputs=[X]) | |
| model=MakeModel(SPACEIDPositionArray,PhonesNumber) | |
| model.compile(optimizer=Adam(learning_rate=1e-4),loss=mean_absolute_error,metrics=[R2Score()]) | |
| hsitory=model.fit(TrainDataPipeline,validation_data=TestDataPipeline,epochs=5,callbacks=[EarlyStopping(patience=3),ModelCheckpoint(f"./IndoorLocalization/IndoorModels/{User}/kerasModel")]) | |
| # hsitory=model.fit(TrainDataPipeline,validation_data=TestDataPipeline,epochs=5,callbacks=[EarlyStopping(patience=3),ModelCheckpoint(r"C:\Users\mf\Desktop\AIProjects")]) | |
| converter=lite.TFLiteConverter.from_keras_model(model) | |
| converter.optimizations=[lite.Optimize.DEFAULT] | |
| converter.target_spec.supported_types=[float16] | |
| tflitemodel=converter.convert() | |
| with open(f"./IndoorLocalization/IndoorModels/{User}/FinalHistoryModel.tflite","wb") as file: | |
| file.write(tflitemodel) |