在前一章中, 学习了所有的流水线处理。最后通过路由的设置,会把创建Instance的请求映射到servers.py的Controller的create方法。这中间会有不少的代码, 但都是一些常用的逻辑, 这里不进行讲述。
Controller/Create
这是一段比较长的代码, 但是做的事情还是比较简单。就是对各种参数的合法性检查。
@wsgi.response(202) @wsgi.serializers(xml=FullServerTemplate) @wsgi.deserializers(xml=CreateDeserializer) def create(self, req, body): """Creates a new server for a given user.""" if not self.is_valid_body(body, 'server'): raise exc.HTTPUnprocessableEntity() #由前面的流水线生成的上下文环境 context = req.environ['nova.context'] #server的全部参数 server_dict = body['server'] #Instance的password password = self._get_server_admin_password(server_dict) #Instance的名称,这个不能为空 if 'name' not in server_dict: msg = _("Server name is not defined") raise exc.HTTPBadRequest(explanation=msg) name = server_dict['name'] #验证name的长度1-255 self._validate_server_name(name) name = name.strip() #从当前参数中,取出image标识,有可能为空 image_uuid = self._image_from_req_data(body) personality = server_dict.get('personality') config_drive = None if self.ext_mgr.is_loaded('os-config-drive'): config_drive = server_dict.get('config_drive') injected_files = [] if personality: #需要注入的文件 injected_files = self._get_injected_files(personality) #安全组配置 sg_names = [] if self.ext_mgr.is_loaded('os-security-groups'): security_groups = server_dict.get('security_groups') if security_groups is not None: sg_names = [sg['name'] for sg in security_groups if sg.get('name')] if not sg_names: sg_names.append('default') sg_names = list(set(sg_names)) #网络配置 requested_networks = None if (self.ext_mgr.is_loaded('os-networks') or utils.is_neutron()): requested_networks = server_dict.get('networks') if requested_networks is not None: if not isinstance(requested_networks, list): expl = _('Bad networks format') raise exc.HTTPBadRequest(explanation=expl) requested_networks = self._get_requested_networks( requested_networks) #ipv4及有效性检查,只是针对IP地址本身,不会考虑其它单元的IP地址 (access_ip_v4, ) = server_dict.get('accessIPv4'), if access_ip_v4 is not None: self._validate_access_ipv4(access_ip_v4) #ipv6及有效性检查,只是针对IP地址本身,不会考虑其它单元的IP地址 (access_ip_v6, ) = server_dict.get('accessIPv6'), if access_ip_v6 is not None: self._validate_access_ipv6(access_ip_v6) try: #Instance的配置信息, 这个必须有 flavor_id = self._flavor_id_from_req_data(body) except ValueError as error: msg = _("Invalid flavorRef provided.") raise exc.HTTPBadRequest(explanation=msg) # optional openstack extensions: key_name = None if self.ext_mgr.is_loaded('os-keypairs'): key_name = server_dict.get('key_name') user_data = None if self.ext_mgr.is_loaded('os-user-data'): user_data = server_dict.get('user_data') self._validate_user_data(user_data) #zone信息, 用来对Instance进行分区,分区基于HOST availability_zone = None if self.ext_mgr.is_loaded('os-availability-zone'): availability_zone = server_dict.get('availability_zone') #硬盘信息, 可以做为启动盘 block_device_mapping = None block_device_mapping_v2 = None legacy_bdm = True if self.ext_mgr.is_loaded('os-volumes'): block_device_mapping = server_dict.get('block_device_mapping', []) for bdm in block_device_mapping: try: block_device.validate_device_name(bdm.get("device_name")) block_device.validate_and_default_volume_size(bdm) except exception.InvalidBDMFormat as e: raise exc.HTTPBadRequest(explanation=e.format_message()) if 'delete_on_termination' in bdm: bdm['delete_on_termination'] = strutils.bool_from_string( bdm['delete_on_termination']) if self.ext_mgr.is_loaded('os-block-device-mapping-v2-boot'): # Consider the new data format for block device mapping block_device_mapping_v2 = server_dict.get( 'block_device_mapping_v2', []) # NOTE (ndipanov): Disable usage of both legacy and new # block device format in the same request if block_device_mapping and block_device_mapping_v2: expl = _('Using different block_device_mapping syntaxes ' 'is not allowed in the same request.') raise exc.HTTPBadRequest(explanation=expl) # Assume legacy format legacy_bdm = not bool(block_device_mapping_v2) try: block_device_mapping_v2 = [ block_device.BlockDeviceDict.from_api(bdm_dict) for bdm_dict in block_device_mapping_v2] except exception.InvalidBDMFormat as e: raise exc.HTTPBadRequest(explanation=e.format_message()) block_device_mapping = (block_device_mapping or block_device_mapping_v2) ret_resv_id = False # min_count and max_count are optional. If they exist, they may come # in as strings. Verify that they are valid integers and > 0. # Also, we want to default 'min_count' to 1, and default # 'max_count' to be 'min_count'. min_count = 1 max_count = 1 if self.ext_mgr.is_loaded('os-multiple-create'): ret_resv_id = server_dict.get('return_reservation_id', False) min_count = server_dict.get('min_count', 1) max_count = server_dict.get('max_count', min_count) try: min_count = utils.validate_integer( min_count, "min_count", min_value=1) max_count = utils.validate_integer( max_count, "max_count", min_value=1) except exception.InvalidInput as e: raise exc.HTTPBadRequest(explanation=e.format_message()) if min_count > max_count: msg = _('min_count must be <= max_count') raise exc.HTTPBadRequest(explanation=msg) auto_disk_config = False if self.ext_mgr.is_loaded('OS-DCF'): auto_disk_config = server_dict.get('auto_disk_config') scheduler_hints = {} if self.ext_mgr.is_loaded('OS-SCH-HNT'): scheduler_hints = server_dict.get('scheduler_hints', {}) try: _get_inst_type = flavors.get_flavor_by_flavor_id inst_type = _get_inst_type(flavor_id, ctxt=context, read_deleted="no")#使用compute_api去创建Instance (instances, resv_id) = self.compute_api.create(context, inst_type, image_uuid, display_name=name, display_description=name, key_name=key_name, metadata=server_dict.get('metadata', {}), access_ip_v4=access_ip_v4, access_ip_v6=access_ip_v6, injected_files=injected_files, admin_password=password, min_count=min_count, max_count=max_count, requested_networks=requested_networks, security_group=sg_names, user_data=user_data, availability_zone=availability_zone, config_drive=config_drive, block_device_mapping=block_device_mapping, auto_disk_config=auto_disk_config, scheduler_hints=scheduler_hints, legacy_bdm=legacy_bdm) except exception.QuotaError as error: raise exc.HTTPRequestEntityTooLarge( explanation=error.format_message(), headers={'Retry-After': 0}) except exception.InvalidMetadataSize as error: raise exc.HTTPRequestEntityTooLarge( explanation=error.format_message()) except exception.ImageNotFound as error: msg = _("Can not find requested image") raise exc.HTTPBadRequest(explanation=msg) except exception.FlavorNotFound as error: msg = _("Invalid flavorRef provided.") raise exc.HTTPBadRequest(explanation=msg) except exception.KeypairNotFound as error: msg = _("Invalid key_name provided.") raise exc.HTTPBadRequest(explanation=msg) except exception.ConfigDriveInvalidValue: msg = _("Invalid config_drive provided.") raise exc.HTTPBadRequest(explanation=msg) except rpc_common.RemoteError as err: msg = "%(err_type)s: %(err_msg)s" % {'err_type': err.exc_type, 'err_msg': err.value} raise exc.HTTPBadRequest(explanation=msg) except UnicodeDecodeError as error: msg = "UnicodeError: %s" % unicode(error) raise exc.HTTPBadRequest(explanation=msg) except (exception.ImageNotActive, exception.InstanceTypeDiskTooSmall, exception.InstanceTypeMemoryTooSmall, exception.InstanceTypeNotFound, exception.InvalidMetadata, exception.InvalidRequest, exception.MultiplePortsNotApplicable, exception.PortNotFound, exception.SecurityGroupNotFound, exception.InvalidBDM) as error: raise exc.HTTPBadRequest(explanation=error.format_message()) except exception.PortInUse as error: raise exc.HTTPConflict(explanation=error.format_message()) # If the caller wanted a reservation_id, return it if ret_resv_id: return wsgi.ResponseObject({'reservation_id': resv_id}, xml=ServerMultipleCreateTemplate) req.cache_db_instances(instances) server = self._view_builder.create(req, instances[0]) if CONF.enable_instance_password: server['server']['adminPass'] = password robj = wsgi.ResponseObject(server) return self._add_location(robj)
compute_api
为了简化问题, 这里假设系统中没有使用cells, 那么对应的computer_api就是 nova/compute/api.py的API类。
create
@hooks.add_hook("create_instance") def create(self, context, instance_type, image_href, kernel_id=None, ramdisk_id=None, min_count=None, max_count=None, display_name=None, display_description=None, key_name=None, key_data=None, security_group=None, availability_zone=None, user_data=None, metadata=None, injected_files=None, admin_password=None, block_device_mapping=None, access_ip_v4=None, access_ip_v6=None, requested_networks=None, config_drive=None, auto_disk_config=None, scheduler_hints=None, legacy_bdm=True): """ Provision instances, sending instance information to the scheduler. The scheduler will determine where the instance(s) go and will handle creating the DB entries. Returns a tuple of (instances, reservation_id) """ #创建规则检查 self._check_create_policies(context, availability_zone, requested_networks, block_device_mapping) if requested_networks and max_count > 1 and utils.is_neutron(): self._check_multiple_instances_neutron_ports(requested_networks) #调用自己的_create_instance return self._create_instance( context, instance_type, image_href, kernel_id, ramdisk_id, min_count, max_count, display_name, display_description, key_name, key_data, security_group, availability_zone, user_data, metadata, injected_files, admin_password, access_ip_v4, access_ip_v6, requested_networks, config_drive, block_device_mapping, auto_disk_config, scheduler_hints=scheduler_hints, legacy_bdm=legacy_bdm)
_create_instance
def _create_instance(self, context, instance_type, image_href, kernel_id, ramdisk_id, min_count, max_count, display_name, display_description, key_name, key_data, security_groups, availability_zone, user_data, metadata, injected_files, admin_password, access_ip_v4, access_ip_v6, requested_networks, config_drive, block_device_mapping, auto_disk_config, reservation_id=None, scheduler_hints=None, legacy_bdm=True): """Verify all the input parameters regardless of the provisioning strategy being performed and schedule the instance(s) for creation. """ # Normalize and setup some parameters if reservation_id is None: reservation_id = utils.generate_uid('r') security_groups = security_groups or ['default'] min_count = min_count or 1 max_count = max_count or min_count block_device_mapping = block_device_mapping or [] if not instance_type: instance_type = flavors.get_default_flavor() if image_href: image_id, boot_meta = self._get_image(context, image_href) else: image_id = None boot_meta = {} boot_meta['properties'] = \ self._get_bdm_image_metadata(context, block_device_mapping, legacy_bdm) self._check_auto_disk_config(image=boot_meta, auto_disk_config=auto_disk_config) handle_az = self._handle_availability_zone availability_zone, forced_host, forced_node = handle_az(context, availability_zone) base_options = self._validate_and_build_base_options(context, instance_type, boot_meta, image_href, image_id, kernel_id, ramdisk_id, display_name, display_description, key_name, key_data, security_groups, availability_zone, forced_host, user_data, metadata, injected_files, access_ip_v4, access_ip_v6, requested_networks, config_drive, block_device_mapping, auto_disk_config, reservation_id) block_device_mapping = self._check_and_transform_bdm( base_options, boot_meta, min_count, max_count, block_device_mapping, legacy_bdm) #创建数据库记录 instances = self._provision_instances(context, instance_type, min_count, max_count, base_options, boot_meta, security_groups, block_device_mapping) #创建过滤条件 filter_properties = self._build_filter_properties(context, scheduler_hints, forced_host, forced_node, instance_type) #更新Instance状态 for instance in instances: self._record_action_start(context, instance, instance_actions.CREATE) #使用compute_task_api 去创建Instance self.compute_task_api.build_instances(context, instances=instances, image=boot_meta, filter_properties=filter_properties, admin_password=admin_password, injected_files=injected_files, requested_networks=requested_networks, security_groups=security_groups, block_device_mapping=block_device_mapping, legacy_bdm=False) return (instances, reservation_id)
compute_task_api(conductor)/build_instances
def build_instances(self, context, instances, image, filter_properties, admin_password, injected_files, requested_networks, security_groups, block_device_mapping, legacy_bdm=True): self.conductor_compute_rpcapi.build_instances(context, instances=instances, image=image, filter_properties=filter_properties, admin_password=admin_password, injected_files=injected_files, requested_networks=requested_networks, security_groups=security_groups, block_device_mapping=block_device_mapping, legacy_bdm=legacy_bdm)
conductor_compute_rpcapi(ComputeTaskAPI)/build_instances
def build_instances(self, context, instances, image, filter_properties, admin_password, injected_files, requested_networks, security_groups, block_device_mapping, legacy_bdm=True): self.conductor_compute_rpcapi.build_instances(context, instances=instances, image=image, filter_properties=filter_properties, admin_password=admin_password, injected_files=injected_files, requested_networks=requested_networks, security_groups=security_groups, block_device_mapping=block_device_mapping, legacy_bdm=legacy_bdm)
conductor_compute_rpcapi(conductor/rpcapi/ConductorAPI)
def build_instances(self, context, instances, image, filter_properties, admin_password, injected_files, requested_networks, security_groups, block_device_mapping, legacy_bdm=True): instances_p = [jsonutils.to_primitive(inst) for inst in instances] image_p = jsonutils.to_primitive(image) cctxt = self.client.prepare(version='1.5') cctxt.cast(context, 'build_instances', instances=instances_p, image=image_p, filter_properties=filter_properties, admin_password=admin_password, injected_files=injected_files, requested_networks=requested_networks, security_groups=security_groups, block_device_mapping=block_device_mapping, legacy_bdm=legacy_bdm)
至此, Conductor发送RPC消息给端的Conductor, 然后再交给Scheduler去处理。
但是这里还有一个比较重要的过程,就是RPC机制,下一章中将继续处理。