Skip to content

Commit

Permalink
Merge pull request #22 from DeltaML/FA-19
Browse files Browse the repository at this point in the history
Fa 19
  • Loading branch information
agrojas authored Oct 14, 2019
2 parents 4369847 + 0809f2d commit d681c4e
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 20 deletions.
1 change: 1 addition & 0 deletions federated_aggregator/config/dev/app_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
DATA_OWNER_PORT = 5000
ITERS_UNTIL_PARTIAL_RESULT = 2
SPLIT_COEFFICIENT = 0.70
MIN_DATA_OWNERS = 4

# Model buyer
MODEL_BUYER_PORT = 9090
Expand Down
1 change: 1 addition & 0 deletions federated_aggregator/config/prod/app_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
DATA_OWNER_PORT = 5000
ITERS_UNTIL_PARTIAL_RESULT = 2
SPLIT_COEFFICIENT = 0.70
MIN_DATA_OWNERS = 4

# Model buyer
MODEL_BUYER_PORT = 9090
Expand Down
11 changes: 9 additions & 2 deletions federated_aggregator/connectors/data_owner_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,20 @@ def get_data_owners_model(self, model_data):
results = self.async_thread_pool.run(executable=self._send_get_request_to_data_owner, args=args)
return [result for result in results]

@optimized_collection_response(optimization=np.asarray, active=True)
def send_requirements_to_data_owners(self, data_owners, data):
args = [
("http://{}:{}/trainings".format(data_owner.host, self.data_owner_port), data)
for data_owner in data_owners
]
results = self.async_thread_pool.run(executable=self._send_post_request_to_data_owner, args=args)
self.async_thread_pool.run(executable=self._send_post_request_to_data_owner, args=args)

@optimized_collection_response(optimization=np.asarray, active=True)
def get_linked_data_owners(self, data_owners, model_id):
args = [
"http://{}:{}/trainings/{}".format(data_owner.host, self.data_owner_port, model_id)
for data_owner in data_owners
]
results = self.async_thread_pool.run(executable=self._send_get_request_to_data_owner, args=args)
return [result for result in results]

def send_mses(self, validators, model_data, mses):
Expand Down
3 changes: 3 additions & 0 deletions federated_aggregator/models/global_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ def __init__(self,
model_id,
model_type,
model_status,
data_owners,
local_trainers,
validators,
model,
Expand All @@ -20,6 +21,7 @@ def __init__(self,
:param model_id: String
:param public_key: String
:param model_type: String
:param data_owners: List[]
:param local_trainers: List[]
:param validators: List[]
:param model: LinearRegression
Expand All @@ -29,6 +31,7 @@ def __init__(self,
self.buyer_host = buyer_host
self.model_type = model_type
self.model_status = model_status
self.data_owners = data_owners
self.local_trainers = local_trainers
self.validators = validators
self.model = model
Expand Down
15 changes: 15 additions & 0 deletions federated_aggregator/resources/model_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@
'mse': fields.Float(required=True, description='Current mse')
})

link = api.model(name='Link', model={
'model_id': fields.String(required=True, description='Model id'),
'data_owner_id': fields.String(required=True, description='DataOwner id')
})


@api.route('', endpoint='model_resources_ep')
class ModelResources(Resource):
Expand All @@ -34,3 +39,13 @@ def post(self):
def get(self):
logging.info("Get models")
return FederatedAggregator().get_models()


@api.route('/<model_id>/accept', endpoint='model_resources_link_ep')
class ModelResources(Resource):

@api.doc('Register data owner with data for training')
@api.marshal_with(link, code=200)
def post(self, model_id):
data = request.get_json()
FederatedAggregator().link_data_owner_to_model(model_id, data['data_owner_id'])
8 changes: 4 additions & 4 deletions federated_aggregator/services/data_owner_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ def register_data_owner(self, data_owner_data):
def get_data_owners(self):
return list(self.data_owners.values())

def send_requirements_to_data_owner(self, data):
def send_requirements(self, data):
return self.data_owner_connector.send_requirements_to_data_owners(list(self.data_owners.values()), data)

def link_data_owners_to_model(self, data):
def get_linked_data_owners_to_model(self, data):
"""
Recevies a data structure that contains the requirements over the data needed for training the model.
Sends these requirements to the data owners. They respond each with a true if they have data that complies with
Expand All @@ -54,9 +54,9 @@ def link_data_owners_to_model(self, data):
:return:
"""
linked_data_owners = []
owners_with_data = self.send_requirements_to_data_owner(data)
owners_with_data = self.data_owner_connector.get_linked_data_owners(list(self.data_owners.values()), data['model_id'])
for data_owner_link in owners_with_data:
if (data['model_id'] == data_owner_link['model_id']) and (data_owner_link['has_dataset']):
if (data['model_id'] == data_owner_link['model_id']) and (data_owner_link['linked']):
data_owner_key = data_owner_link['data_owner_id']
linked_data_owners.append(self.data_owners[data_owner_key])
return linked_data_owners
Expand Down
52 changes: 38 additions & 14 deletions federated_aggregator/services/federated_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,20 @@ def init(self, encryption_service, data_owner_service, config):
def get_models(self):
return list(self.global_models.values())

def federated_learning_wrapper(self, data):
return self.federated_learning_first_phase(data)

def federated_learning_wrapper2(self, model_id):
return self.federated_learning_second_phase(model_id)

def process(self, remote_address, data):
Thread(target=self.async_server_processing,
args=self._build_async_processing_data(data, remote_address)).start()

def continue_process(self, model_id):
Thread(target=self.async_server_processing,
args=self._build_async_processing_data2(model_id)).start()

@staticmethod
def async_server_processing(func, *args):
logging.info("process_in_background...")
Expand All @@ -62,30 +72,50 @@ def _build_async_processing_data(self, data, remote_address):
data["remote_address"] = remote_address
return self.federated_learning_wrapper, data

def federated_learning(self, data):
logging.info("Init federated_learning")
def _build_async_processing_data2(self, model_id):
return self.federated_learning_wrapper2, model_id

def federated_learning_first_phase(self, data):
model_id = data['model_id']
try:
linked_data_owners = self.data_owner_service.link_data_owners_to_model(data)
self.validate_linked_data_owners(linked_data_owners, model_id)
local_trainers, validators = self.split_data_owners(linked_data_owners)
self.data_owner_service.send_requirements(data)
self.encryption_service.set_public_key(data["public_key"])
model = self.initialize_global_model(data)
self.global_models[model_id] = GlobalModel(model_id=model_id,
buyer_id=data["model_buyer_id"],
buyer_host=data["remote_address"],
model_type=data['model_type'],
model_status=data["status"],
local_trainers=local_trainers,
validators=validators,
data_owners=[],
local_trainers=[],
validators=[],
model=model,
initial_mse=None,
mse=None,
public_key=data["public_key"],
partial_MSEs=None,
step=data["step"])
logging.info('Running distributed gradient aggregation for {:d} iterations'.format(self.n_iter))
except Exception as e:
logging.error(e)
self.send_error_to_model_buyer(model_id)

def link_data_owner_to_model(self, model_id, data_owner_id):
linked_data_owners = self.global_models[model_id].data_owners
if data_owner_id in self.data_owner_service.data_owners:
linked_data_owners.append(self.data_owner_service.data_owners[data_owner_id])
logging.info("Adding data owner to training, number {}".format(len(linked_data_owners)))
if len(linked_data_owners) >= self.config['MIN_DATA_OWNERS']:
self.continue_process(model_id)

def federated_learning_second_phase(self, model_id):
logging.info("Init federated_learning")
try:
model_data = self.global_models[model_id]
self.validate_linked_data_owners(model_data.data_owners, model_id)
local_trainers, validators = self.split_data_owners(model_data.data_owners)
model_data.local_trainers = local_trainers
model_data.validators = validators
logging.info('Running distributed gradient aggregation for {:d} iterations'.format(self.n_iter))

diffs = self.data_owner_service.get_model_metrics_from_validators(model_data)
metrics_handler = MetricsHandler(diffs[0].size)
Expand Down Expand Up @@ -259,10 +289,6 @@ def send_mses_to_model_buyer(self, model_id, mse, partial_mses, noise, first_upd
}
return self.model_buyer_connector.send_mses(message)['ok']


def federated_learning_wrapper(self, data):
return self.federated_learning(data)

def federated_averaging(self, updates, model_data):
"""
Sum all de partial updates and
Expand Down Expand Up @@ -318,7 +344,5 @@ def calculate_contributions(self, model_id, mse, initial_mse, partial_MSEs):
for data_owner in partial_MSEs:
contributions[data_owner] = (contributions[data_owner]) / contributions_sum
return {'model_id': model_id, 'improvement': improvement, 'contributions': contributions}
#return model_id, improvement, contributions



0 comments on commit d681c4e

Please sign in to comment.