在消費(fèi)端服務(wù)是基于接口調(diào)用Provider端提供的服務(wù),所以在消費(fèi)端并沒有服務(wù)公共接口的實(shí)現(xiàn)類。
@RestController
@RequestMapping("/consumer")
public class CountryController {
@DubboReference(version = "2.0",lazy = true)
CountryService countryService;
@GetMapping("/getCountry")
public JSONObject getCountry() {
JSONObject rtn = new JSONObject();
rtn.put("msg",countryService.getCountry());
return rtn;
}
}
- 利用注解@DubboReference將目標(biāo)接口CountryService作為CountryController類的字段屬性,在解析類CountryController時獲取全部字段屬性并單獨(dú)關(guān)注解析存在注解@DubboReference的字段屬性。
- 通過步驟1得到服務(wù)公共接口類型,在生成RootBeanDefinition時設(shè)置其Class屬性為ReferenceBean,最終將服務(wù)公共接口CountryService注冊至IOC容器中。
- 通過JdkDynamicAopProxy對服務(wù)公共接口生成代理。
1.ReferenceAnnotationBeanPostProcessor
ReferenceAnnotationBeanPostProcessor后置處理器重置服務(wù)目標(biāo)接口CountryService在IOC注冊表class的屬性為ReferenceBean。
在Spring實(shí)例化 & 初始化?IOC容器中涉及所有bean時,觸發(fā)ReferenceBean的FactoryBean特性完成接口CountryService在IOC容器中的bean管理。
public class ReferenceBean<T> implements FactoryBean<T>{
@Override
public T getObject() {
if (lazyProxy == null) {
// 對目標(biāo)類代理處理
createLazyProxy();
}
return (T) lazyProxy;
}
private void createLazyProxy() {
ProxyFactory proxyFactory = new ProxyFactory();
proxyFactory.setTargetSource(new DubboReferenceLazyInitTargetSource());
proxyFactory.addInterface(interfaceClass);
Class<?>[] internalInterfaces = AbstractProxyFactory.getInternalInterfaces();
for (Class<?> anInterface : internalInterfaces) {
proxyFactory.addInterface(anInterface);
}
if (!StringUtils.isEquals(interfaceClass.getName(), interfaceName)) {
Class<?> serviceInterface = ClassUtils.forName(interfaceName, beanClassLoader);
proxyFactory.addInterface(serviceInterface);
}
//jdk基于接口代理
this.lazyProxy = proxyFactory.getProxy(this.beanClassLoader);
}
private Object getCallProxy() throws Exception {
// 利用 ReferenceConfig 初始化 lazyTarget 屬性
return referenceConfig.get();
}
private class DubboReferenceLazyInitTargetSource extends AbstractLazyCreationTargetSource {
@Override
protected Object createObject() throws Exception {
return getCallProxy();
}
@Override
public synchronized Class<?> getTargetClass() {
return getInterfaceClass();
}
}
}
jdk動態(tài)代理技術(shù)生成目標(biāo)接口代理類過程中需要注意 DubboReferenceLazyInitTargetSource之lazyTarget屬性【屬性賦值時機(jī)、屬性使用時機(jī)】。
Dubbo中TargetSource之DubboReferenceLazyInitTargetSource可以控制屬性值lazyTarget初始化時機(jī),其實(shí)是通過抽象類AbstractLazyCreationTargetSource完成的。
1.1.服務(wù)公共接口CountryService代理過程
如下所示:接口CountryService的代理類在執(zhí)行代理方法過程中涉及利用TargetSource獲取lazyTarget屬性。
class JdkDynamicAopProxy implements AopProxy, InvocationHandler, Serializable {
// 代理目標(biāo)類中的目標(biāo)方法
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Object oldProxy = null;
boolean setProxyContext = false;
// 通過 ProxyFactory 獲取TargetSource之DubboReferenceLazyInitTargetSource
TargetSource targetSource = this.advised.targetSource;
Object target = null;
...
Object retVal;
// 此處就是獲取目標(biāo)接口CountryService的代理,其實(shí)就是lazyTarget屬性值
target = targetSource.getTarget();
Class<?> targetClass = (target != null ? target.getClass() : null);
List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);
if (chain.isEmpty()) {// 默認(rèn)情況下成立
Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args);
retVal = AopUtils.invokeJoinpointUsingReflection(target, method, argsToUse);
}
else {
...
}
...
return retVal;
}
}
根據(jù)注解得知lazyTarget屬性賦值是懶加載方式得到的,即首次獲取lazyTarget對象時才真正觸發(fā)其完成賦值。但是實(shí)際情況是在創(chuàng)建目標(biāo)接口的代理類時就實(shí)現(xiàn)賦值操作【不知道為啥?】。?
public abstract class AbstractLazyCreationTargetSource implements TargetSource {
/**
* Returns the lazy-initialized target object,
* creating it on-the-fly if it doesn't exist already.
* @see #createObject()
*/
@Override
public synchronized Object getTarget() throws Exception {
if (this.lazyTarget == null) {
// 觸發(fā)CountryService的代理
this.lazyTarget = createObject();
}
return this.lazyTarget;
}
}
綜上所述:目標(biāo)接口代理類生成過程中還涉及l(fā)azyTarget屬性賦值。而且發(fā)現(xiàn)目標(biāo)接口代理類的類名、lazyTarget屬性名均為CountryServiceDubboProxy0。但是兩者區(qū)別是前者實(shí)例中持有的InvocationHandler類型為JdkDynamicAopProxy,后者持有的InvocationHandler類型為InvokerInvocationHandler。
1.2.ReferenceConfig
public class ReferenceConfig<T> extends ReferenceConfigBase<T> {
private transient volatile T ref;
@Override
public T get() {
if (ref == null) {
getScopeModel().getDeployer().start();
synchronized (this) {
if (ref == null) {
init();
}
}
}
return ref;
}
}
如上所示?ReferenceConfig 的核心字段?ref 最終指向類TargetSource中涉及的lazyTarget字段。
疑問:字段ref 賦值時機(jī):
- 可能是接口CountryService生成jdk代理類的過程。
- 可能是接口CountryService代理目標(biāo)方法執(zhí)行過程。
- 可能是監(jiān)聽器DubboDeployApplicationListener處理相關(guān)事件過程。
具體是哪個階段真正完成字段?ref 的賦值,尚未確定。
2.DubboDeployApplicationListener
該監(jiān)聽器的核心作用:字段?ref 的賦值流程。
public class DefaultModuleDeployer extends AbstractDeployer<ModuleModel> implements ModuleDeployer {
private final ModuleConfigManager configManager;
private final SimpleReferenceCache referenceCache;
@Override
public synchronized Future start() throws IllegalStateException {
...
onModuleStarting();
// initialize
applicationDeployer.initialize();
initialize();
// export services
exportServices();
...
// refer services 這里面是核心功能
referServices();
...
return startFuture;
}
private void referServices() {
// 配置管理器中獲取 @DubboReference 注解引入的目標(biāo)接口配置信息
configManager.getReferences().forEach(rc -> {
ReferenceConfig<?> referenceConfig = (ReferenceConfig<?>) rc;
...
referenceCache.get(rc);
});
}
}
public class SimpleReferenceCache implements ReferenceCache {
public <T> T get(ReferenceConfigBase<T> rc) {
String key = generator.generateKey(rc);
Class<?> type = rc.getInterfaceClass();
Object proxy = rc.get();// ReferenceConfig初始化字段ref
...
return (T) proxy;
}
}
2.1.?Protocol協(xié)議之refer
接口Protocol的擴(kuò)展類如下圖所示:
public interface Protocol {
// 服務(wù)提供者注冊目標(biāo)接口
@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
// 服務(wù)消費(fèi)端通過注解@DubboReference引進(jìn)目標(biāo)接口
@Adaptive
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
default List<ProtocolServer> getServers() {
return Collections.emptyList();
}
}
public class ReferenceConfig<T> extends ReferenceConfigBase<T> {
private Protocol protocolSPI;
@Override
public T get() {
if (ref == null) {
// ensure start module, compatible with old api usage
getScopeModel().getDeployer().start();
synchronized (this) {
if (ref == null) {
init();
}
}
}
return ref;
}
protected synchronized void init() {
...
ref = createProxy(referenceParameters);
...
}
private T createProxy(Map<String, String> referenceParameters) {
...
createInvokerForRemote();
...
// create service proxy
return (T) proxyFactory.getProxy(invoker, ProtocolUtils.isGeneric(generic));
}
private void createInvokerForRemote() {
// URL 協(xié)議類型為 registry
URL curUrl = urls.get(0);
invoker = protocolSPI.refer(interfaceClass, curUrl);
if (!UrlUtils.isRegistry(curUrl)) {
List<Invoker<?>> invokers = new ArrayList<>();
invokers.add(invoker);
invoker = Cluster.getCluster(scopeModel, Cluster.DEFAULT).join(new StaticDirectory(curUrl, invokers), true);
}
}
}
registry類型的url協(xié)議:代表注冊中心地址,即zk的IP & 端口號等相關(guān)注冊中心地址相關(guān)信息。
registry://zk-host:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-consumer&dubbo=2.0.2&pid=66082&qos.enable=false®ister-mode=instance®istry=zookeeper&release=3.0.7×tamp=1710075502746
?如圖所示接口Protocol擴(kuò)展類中真正初始化字段refer值的是RegistryProtocol#refer。
2.1.1.RegistryProtocol
public class RegistryProtocol implements Protocol, ScopeModelAware {
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
// 將registry協(xié)議替換為zookeeper協(xié)議
url = getRegistryUrl(url);
Registry registry = getRegistry(url);
...
Cluster cluster = Cluster.getCluster(url.getScopeModel(), qs.get(CLUSTER_KEY));
return doRefer(cluster, registry, type, url, qs);
}
protected <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url, Map<String, String> parameters) {
...
// consumerUrl 即消費(fèi)端當(dāng)前的IP地址,該地址作用是:為provider服務(wù)提供響應(yīng)地址
URL consumerUrl = new ServiceConfigURL (
p,null,null,parameters.get(REGISTER_IP_KEY),0, getPath(parameters, type),parameters,consumerAttribute
);
url = url.putAttribute(CONSUMER_URL_KEY, consumerUrl);
// migrationInvoker:MigrationInvoker
ClusterInvoker<T> migrationInvoker = getMigrationInvoker(this, cluster, registry, type, url, consumerUrl);
return interceptInvoker(migrationInvoker, url, consumerUrl);
}
protected <T> Invoker<T> interceptInvoker(ClusterInvoker<T> invoker, URL url, URL consumerUrl) {
List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
for (RegistryProtocolListener listener : listeners) {
// 默認(rèn)情況下存在監(jiān)聽器之MigrationRuleListener
listener.onRefer(this, invoker, consumerUrl, url);
}
return invoker;
}
}
zookeeper協(xié)議:
zookeeper://zk-host:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-consumer&dubbo=2.0.2&pid=66082&qos.enable=false®ister-mode=instance&release=3.0.7×tamp=1710075502746
consumer協(xié)議:消費(fèi)端注冊服務(wù)時不需要指定端口,因?yàn)閜rovider端 & consumer端是通過tcp協(xié)議之Netty【端口:20880】完成兩端信息通信。文章來源:http://www.zghlxwxcb.cn/news/detail-839011.html
consumer://192.168.1.6/common.service.CountryService?application=dubbo-consumer&background=false&dubbo=2.0.2&interface=common.service.CountryService&lazy=true&methods=getCountry&pid=66082&qos.enable=false®ister-mode=instance®ister.ip=192.168.1.6&release=3.0.7&revision=2.0&side=consumer&sticky=false×tamp=1710075502715&version=2.0
?文章來源地址http://www.zghlxwxcb.cn/news/detail-839011.html
3.ServiceDiscoveryRegistry
public class ServiceDiscoveryRegistry extends FailbackRegistry {
private final AbstractServiceNameMapping serviceNameMapping;
@Override
public void doSubscribe(URL url, NotifyListener listener) {
url = addRegistryClusterKey(url);
serviceDiscovery.subscribe(url, listener);
boolean check = url.getParameter(CHECK_KEY, false);
String key = ServiceNameMapping.buildMappingKey(url);
Lock mappingLock = serviceNameMapping.getMappingLock(key);
mappingLock.lock();
Set<String> subscribedServices = serviceNameMapping.getCachedMapping(url);
MappingListener mappingListener = new DefaultMappingListener(url, subscribedServices, listener);
subscribedServices = serviceNameMapping.getAndListen(this.getUrl(), url, mappingListener);
mappingListeners.put(url.getProtocolServiceKey(), mappingListener);
subscribeURLs(url, listener, subscribedServices);
}
protected void subscribeURLs(URL url, NotifyListener listener, Set<String> serviceNames) {
//
serviceNames = toTreeSet(serviceNames);
String serviceNamesKey = toStringKeys(serviceNames);
String protocolServiceKey = url.getProtocolServiceKey();
logger.info(String.format("Trying to subscribe from apps %s for service key %s, ", serviceNamesKey, protocolServiceKey));
// register ServiceInstancesChangedListener
Lock appSubscriptionLock = getAppSubscription(serviceNamesKey);
try {
appSubscriptionLock.lock();
ServiceInstancesChangedListener serviceInstancesChangedListener = serviceListeners.get(serviceNamesKey);
if (serviceInstancesChangedListener == null) {
serviceInstancesChangedListener = serviceDiscovery.createListener(serviceNames);
serviceInstancesChangedListener.setUrl(url);
for (String serviceName : serviceNames) {//
// 獲取提供端服務(wù)的IP、端口號等信息
List<ServiceInstance> serviceInstances = serviceDiscovery.getInstances(serviceName);
if (CollectionUtils.isNotEmpty(serviceInstances)) {
serviceInstancesChangedListener.onEvent(new ServiceInstancesChangedEvent(serviceName, serviceInstances));
}
}
serviceListeners.put(serviceNamesKey, serviceInstancesChangedListener);
}
if (!serviceInstancesChangedListener.isDestroyed()) {
serviceInstancesChangedListener.setUrl(url);
listener.addServiceListener(serviceInstancesChangedListener);
serviceInstancesChangedListener.addListenerAndNotify(protocolServiceKey, listener);
serviceDiscovery.addServiceInstancesChangedListener(serviceInstancesChangedListener);
} else {
logger.info(String.format("Listener of %s has been destroyed by another thread.", serviceNamesKey));
serviceListeners.remove(serviceNamesKey);
}
} finally {
appSubscriptionLock.unlock();
}
}
}
public abstract class AbstractServiceNameMapping implements ServiceNameMapping, ScopeModelAware {
@Override
public Set<String> getAndListen(URL registryURL, URL subscribedURL, MappingListener listener) {
String key = ServiceNameMapping.buildMappingKey(subscribedURL);
// use previously cached services.
Set<String> mappingServices = this.getCachedMapping(key);
// Asynchronously register listener in case previous cache does not exist or cache expired.
if (CollectionUtils.isEmpty(mappingServices)) {
...
} else {
ExecutorService executorService = applicationModel.getFrameworkModel().getBeanFactory()
.getBean(FrameworkExecutorRepository.class).getMappingRefreshingExecutor();
executorService.submit(new AsyncMappingTask(listener, subscribedURL, true));
}
return mappingServices;
}
private class AsyncMappingTask implements Callable<Set<String>> {
private final MappingListener listener;
private final URL subscribedURL;
private final boolean notifyAtFirstTime;
public AsyncMappingTask(MappingListener listener, URL subscribedURL, boolean notifyAtFirstTime) {
this.listener = listener;
this.subscribedURL = subscribedURL;
this.notifyAtFirstTime = notifyAtFirstTime;
}
@Override
public Set<String> call() throws Exception {
synchronized (mappingListeners) {
Set<String> mappedServices = emptySet();
try {
String mappingKey = ServiceNameMapping.buildMappingKey(subscribedURL);
if (listener != null) {
// 通過zk 客戶端 獲取服務(wù)提供端的服務(wù)名集合
mappedServices = toTreeSet(getAndListen(subscribedURL, listener));
Set<MappingListener> listeners = mappingListeners.computeIfAbsent(mappingKey, _k -> new HashSet<>());
listeners.add(listener);
if (CollectionUtils.isNotEmpty(mappedServices)) {
if (notifyAtFirstTime) {
// 將 提供端服務(wù)名 添加到本地集合緩存 serviceNameMapping 中
// DefaultMappingListener:本地緩存 & zk 服務(wù)端 之間保證 提供端服務(wù)名 一致性
listener.onEvent(new MappingChangedEvent(mappingKey, mappedServices));
}
}
} else {
mappedServices = get(subscribedURL);
if (CollectionUtils.isNotEmpty(mappedServices)) {
AbstractServiceNameMapping.this.putCachedMapping(mappingKey, mappedServices);
}
}
} catch (Exception e) {
logger.error("Failed getting mapping info from remote center. ", e);
}
return mappedServices;
}
}
}
}
NettyChannel真正netty發(fā)送數(shù)據(jù)。
CodecSupportNetty相關(guān)協(xié)議。
到了這里,關(guān)于Dubbo之消費(fèi)端服務(wù)RPC調(diào)用的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!