123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151 |
- import sys
- from copy import deepcopy
- from functools import reduce
- from random import choice
- import numpy as np
- import pandas as pd
- class GeneticAlgorithmConnect(object):
- def __init__(self, min_ini=pd.Timedelta('0 days 00:15:00')):
- self.flight_info = pd.DataFrame()
- self.min_ini = min_ini
- def initialize_flight_info(self):
- signal_day_ = pd.read_csv('Bin\\Data\\Signal_day.csv')
- signal_day_['DES_REAL_LANDING'] = pd.to_datetime(signal_day_['DES_REAL_LANDING'])
- signal_day_['DEP_REAL_TAKEOFF'] = pd.to_datetime(signal_day_['DEP_REAL_TAKEOFF']) + self.min_ini
- self.flight_info = signal_day_.sort_values('DES_REAL_LANDING')
- def marshaling_result(self, seat):
- self.flight_info['Seat'] = seat
- self.flight_info['DEP_REAL_TAKEOFF'] -= self.min_ini
- self.flight_info.to_csv('Bin\\Data\\Result.csv', index=False)
- class GeneticAlgorithm:
- def __init__(self,
- dna_length=50,
- population_number=100,
- cross_probability=0.3,
- mutation_probability=0,
- generate_number=20,
- target='max'
- ):
- self.dna_length = dna_length
- self.population_number = population_number
- self.cross_probability = cross_probability
- self.mutation_probability = mutation_probability
- self.generate_number = generate_number
- self.target = 1 if target == 'max' else -1
- self.connect_data = GeneticAlgorithmConnect()
- self.boys = pd.DataFrame([], columns=['name', 'fitness', 'actual_fitness'])
- def initialize_connect_class(self):
- self.connect_data.initialize_flight_info()
- def decimal_individual(self):
- fi = self.connect_data.flight_info
- pork = list(range(self.dna_length))
- boy = []
- for index, row in fi.iterrows():
- valid = self.valid_pork(boy, row) + list(set(pork) ^ set(list(set(boy)))) if boy else pork
- if valid:
- boy.append(choice(valid))
- else:
- print('初代无解,程序退出')
- sys.exit()
- return boy
- def valid_pork(self, boy, row):
- assigned_position = self.connect_data.flight_info.iloc[:len(boy)].copy()
- assigned_position['Seat'] = boy
- takeoff_time = assigned_position.groupby('Seat').apply(lambda x: x['DEP_REAL_TAKEOFF'].max())
- return takeoff_time[takeoff_time < row['DES_REAL_LANDING']].index.tolist()
- @staticmethod
- def duplicate_removal(some_list):
- return reduce(lambda x, y: x if y in x else x + [y], [[], ] + some_list)
- def get_boys(self):
- boys = []
- while len(boys) < self.population_number:
- boys.append(self.decimal_individual())
- boys = self.duplicate_removal(boys)
- self.boys = pd.DataFrame([], columns=['name', 'fitness']) # Collecting fitness in a moment
- self.boys['name'] = boys
- def get_fitness(self):
- fi = self.connect_data.flight_info.copy()
- fitness = []
- for boy in self.boys['name'].values:
- fi['Seat'] = boy
- fitness.append(fi.groupby('Seat').apply(self.get_diff).sum())
- self.make_up(fitness) # never go out without make-up
- self.sort_values()
- def make_up(self, fitness):
- self.boys['actual_fitness'] = fitness
- self.boys['fitness'] = fitness
- self.boys = self.boys.fillna(max(fitness) if self.target > 0 else min(fitness))
- self.boys['fitness'] *= self.target
- self.boys['fitness'] = self.boys['fitness'] - self.boys['fitness'].min() + 1e-5
- self.boys['fitness'] = self.boys['fitness'].ffill()
- def sort_values(self): # It's always a ascending order
- self.boys = self.boys.sort_values('fitness').reset_index(drop=True)
- @staticmethod
- def get_diff(x):
- x = x.sort_values('DES_REAL_LANDING')
- if (x['DES_REAL_LANDING'] < x['DEP_REAL_TAKEOFF'].shift()).any():
- return np.nan
- return (x['DES_REAL_LANDING'] - x['DEP_REAL_TAKEOFF'].shift()).sum().total_seconds()
- def selection(self):
- idx = np.random.choice(self.population_number, size=self.population_number, replace=True,
- p=self.boys['fitness'] / self.boys['fitness'].sum())
- self.boys = self.boys.iloc[idx].copy()
- self.sort_values()
- def crossover(self):
- boys = self.boys['name'].values
- girls = deepcopy(boys)
- def should_recovery(index_):
- if not self.is_the_boy_healthy(boy):
- boy[:] = girls[index_]
- for index, boy in enumerate(boys): # some children is unattractive
- if index == 0:
- boy[:] = girls[-1]
- elif np.random.rand() < self.cross_probability: # the boy should change
- girl = choice(girls)
- exchange_of_chromosomes = np.random.randint(self.dna_length, size=self.population_number)
- for chromosome in list(set(exchange_of_chromosomes.tolist())):
- boy[chromosome] = girl[chromosome]
- should_recovery(index)
- if self.mutation_probability > 0:
- for chromosome in range(self.dna_length):
- if np.random.rand() < self.mutation_probability:
- boy[chromosome] = np.random.randint(self.dna_length)
- should_recovery(index)
- def is_the_boy_healthy(self, boy):
- fi = self.connect_data.flight_info.copy()
- fi['Seat'] = boy
- return fi.groupby('Seat').apply(self.physical_examination).all()
- @staticmethod
- def physical_examination(x):
- if x.shape[0] > 1:
- x = x.sort_values('DES_REAL_LANDING')
- return not (x['DEP_REAL_TAKEOFF'].shift() > x['DES_REAL_LANDING']).any()
- return True
- def marshaling_result(self):
- self.connect_data.marshaling_result(self.boys.iloc[0]['name'])
|