Skip to end of metadata
Go to start of metadata

Parameter Callback

(+) (#)

参数回调方式与调用本地callback或listener相同,只需要在Spring的配置文件中声明哪个参数是callback类型即可,Dubbo将基于长连接生成反向代理,这样就可以从服务器端调用客户端逻辑。
2.0.6及其以上版本支持

代码参见:http://code.alibabatech.com/svn/dubbo/trunk/dubbo-test/dubbo-test-examples/src/main/java/com/alibaba/dubbo/examples/callback

(1) 共享服务接口:

服务接口示例:

CallbackService.java
package com.callback;
 
public interface CallbackService {
    void addListener(String key, CallbackListener listener);
}
CallbackListener.java
package com.callback;
 
public interface CallbackListener {
    void changed(String msg);
}

(2) 服务提供者:

服务提供者接口实现示例:

CallbackServiceImpl.java
package com.callback.impl;
 
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
 
import com.callback.CallbackListener;
import com.callback.CallbackService;
 
public class CallbackServiceImpl implements CallbackService {
     
    private final Map<String, CallbackListener> listeners = new ConcurrentHashMap<String, CallbackListener>();
  
    public CallbackServiceImpl() {
        Thread t = new Thread(new Runnable() {
            public void run() {
                while(true) {
                    try {
                        for(Map.Entry<String, CallbackListener> entry : listeners.entrySet()){
                           try {
                               entry.getValue().changed(getChanged(entry.getKey()));
                           } catch (Throwable t) {
                               listeners.remove(entry.getKey());
                           }
                        }
                        Thread.sleep(5000); // 定时触发变更通知
                    } catch (Throwable t) { // 防御容错
                        t.printStackTrace();
                    }
                }
            }
        });
        t.setDaemon(true);
        t.start();
    }
  
    public void addListener(String key, CallbackListener listener) {
        listeners.put(key, listener);
        listener.changed(getChanged(key)); // 发送变更通知
    }
     
    private String getChanged(String key) {
        return "Changed: " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
    }
 
}

服务提供者配置示例:

<bean id="callbackService" class="com.callback.impl.CallbackServiceImpl" />
<dubbo:service interface="com.callback.CallbackService" ref="callbackService" connections="1" callbacks="1000">
    <dubbo:method name="addListener">
        <dubbo:argument index="1" callback="true" />
        <!--也可以通过指定类型的方式-->
        <!--<dubbo:argument type="com.demo.CallbackListener" callback="true" />-->
    </dubbo:method>
</dubbo:service>

(2) 服务消费者:

服务消费者配置示例:

consumer.xml
<dubbo:reference id="callbackService" interface="com.callback.CallbackService" />

服务消费者调用示例:

CallbackServiceTest.java
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath:consumer.xml");
context.start();
 
CallbackService callbackService = (CallbackService) context.getBean("callbackService");
 
callbackService.addListener("http://10.20.160.198/wiki/display/dubbo/foo.bar", new CallbackListener(){
    public void changed(String msg) {
        System.out.println("callback1:" + msg);
    }
});
Labels:
None
Enter labels to add to this page:
Please wait 
Looking for a label? Just start typing.
  1. 一月 29, 2012

    Anonymous

    我参照例子写的,但是失败.

    callbackService.addListener("http://10.20.160.198/wiki/display/dubbo/foo.bar", new CallbackListener() {
         public void changed(String msg) {
             System.out.println("callback1:" + msg);
         }
    });

    这一步就报错了。提示尝试3次,服务端无返回错误。
    CallbackListener需要继承序列号接口吗?dubbo协议支持这种callback吗?

    1. 二月 06, 2012

      非常抱歉,此为Callback功能的BUG,已在2.0.12版本修复此问题,参见:http://code.alibabatech.com/jira/browse/DUBBO-200

  2. 十二月 05, 2012

    Anonymous

    每次将调用端停止后就会出现这样的例外,以后再启动客户端此例外一直存在。

    2012-12-05 14:09:55,788 WARN [com.alibaba.dubbo.remoting.exchange.support.header.HeartBeatTask] - < [DUBBO] Exception when heartbeat to remote channel /192.168.19.31:20880, dubbo version: 2.5.3, current host: 192.168.19.31>
    com.alibaba.dubbo.remoting.RemotingException: Failed to send message Request [id=20, version=2.0.0, twoway=true, event=true, broken=false, data=null] to /192.168.19.31:39829, cause: null
    at com.alibaba.dubbo.remoting.transport.netty.NettyChannel.send(NettyChannel.java:108)
    at com.alibaba.dubbo.rpc.protocol.dubbo.ChannelWrappedInvoker$ChannelWrapper.send(ChannelWrappedInvoker.java:153)
    at com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeChannel.send(HeaderExchangeChannel.java:87)
    at com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeChannel.send(HeaderExchangeChannel.java:77)
    at com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeClient.send(HeaderExchangeClient.java:111)
    at com.alibaba.dubbo.remoting.exchange.support.header.HeartBeatTask.run(HeartBeatTask.java:64)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
    at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:181)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:205)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
    at java.lang.Thread.run(Thread.java:619)
    Caused by: java.nio.channels.ClosedChannelException
    at org.jboss.netty.channel.socket.nio.NioWorker.cleanUpWriteBuffer(NioWorker.java:643)
    at org.jboss.netty.channel.socket.nio.NioWorker.writeFromUserCode(NioWorker.java:370)
    at org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.handleAcceptedSocket(NioServerSocketPipelineSink.java:137)
    at org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.eventSunk(NioServerSocketPipelineSink.java:76)
    at org.jboss.netty.channel.Channels.write(Channels.java:632)
    at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:70)
    at com.alibaba.dubbo.remoting.transport.netty.NettyHandler.writeRequested(NettyHandler.java:99)
    at org.jboss.netty.channel.Channels.write(Channels.java:611)
    at org.jboss.netty.channel.Channels.write(Channels.java:578)
    at org.jboss.netty.channel.AbstractChannel.write(AbstractChannel.java:251)
    at com.alibaba.dubbo.remoting.transport.netty.NettyChannel.send(NettyChannel.java:98)

  3. 八月 15, 2013

    Anonymous

    用的版本是2.5.6 回调返回后一直报这个错误
    警告: [DUBBO] Exception when heartbeat to remote channel /192.168.17.93:20882, dubbo version: 2.5.3, current host: 192.168.17.93
    com.alibaba.dubbo.remoting.RemotingException: Failed to send message Request [id=19, version=2.0.0, twoway=true, event=true, broken=false, data=null] to /192.168.17.93:64005, cause: null
    at com.alibaba.dubbo.remoting.transport.netty.NettyChannel.send(NettyChannel.java:108)
    at com.alibaba.dubbo.rpc.protocol.dubbo.ChannelWrappedInvoker$ChannelWrapper.send(ChannelWrappedInvoker.java:153)
    at com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeChannel.send(HeaderExchangeChannel.java:87)
    at com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeChannel.send(HeaderExchangeChannel.java:77)
    at com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeClient.send(HeaderExchangeClient.java:111)
    at com.alibaba.dubbo.remoting.exchange.support.header.HeartBeatTask.run(HeartBeatTask.java:64)
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
    at java.util.concurrent.FutureTask$Sync.innerRunAndReset(Unknown Source)
    at java.util.concurrent.FutureTask.runAndReset(Unknown Source)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(Unknown Source)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(Unknown Source)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
    Caused by: java.nio.channels.ClosedChannelException
    at org.jboss.netty.channel.socket.nio.NioWorker.cleanUpWriteBuffer(NioWorker.java:643)
    at org.jboss.netty.channel.socket.nio.NioWorker.writeFromUserCode(NioWorker.java:370)
    at org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.handleAcceptedSocket(NioServerSocketPipelineSink.java:137)
    at org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.eventSunk(NioServerSocketPipelineSink.java:76)
    at org.jboss.netty.channel.Channels.write(Channels.java:632)
    at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:70)
    at com.alibaba.dubbo.remoting.transport.netty.NettyHandler.writeRequested(NettyHandler.java:99)
    at org.jboss.netty.channel.Channels.write(Channels.java:611)
    at org.jboss.netty.channel.Channels.write(Channels.java:578)
    at org.jboss.netty.channel.AbstractChannel.write(AbstractChannel.java:251)
    at com.alibaba.dubbo.remoting.transport.netty.NettyChannel.send(NettyChannel.java:98)
    ... 14 more
    2013-8-15 15:22:24 com.alibaba.dubbo.common.logger.jcl.JclLogger warn
    警告: [DUBBO] Exception when heartbeat to remote channel /192.168.17.93:20882, dubbo version: 2.5.3, current host: 192.168.17.93
    com.alibaba.dubbo.remoting.RemotingException: Failed to send message Request [id=20, version=2.0.0, twoway=true, event=true, broken=false, data=null] to /192.168.17.93:64005, cause: null
    at com.alibaba.dubbo.remoting.transport.netty.NettyChannel.send(NettyChannel.java:108)
    at com.alibaba.dubbo.rpc.protocol.dubbo.ChannelWrappedInvoker$ChannelWrapper.send(ChannelWrappedInvoker.java:153)
    at com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeChannel.send(HeaderExchangeChannel.java:87)
    at com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeChannel.send(HeaderExchangeChannel.java:77)
    at com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeClient.send(HeaderExchangeClient.java:111)
    at com.alibaba.dubbo.remoting.exchange.support.header.HeartBeatTask.run(HeartBeatTask.java:64)
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
    at java.util.concurrent.FutureTask$Sync.innerRunAndReset(Unknown Source)
    at java.util.concurrent.FutureTask.runAndReset(Unknown Source)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(Unknown Source)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(Unknown Source)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
    Caused by: java.nio.channels.ClosedChannelException
    at org.jboss.netty.channel.socket.nio.NioWorker.cleanUpWriteBuffer(NioWorker.java:643)
    at org.jboss.netty.channel.socket.nio.NioWorker.writeFromUserCode(NioWorker.java:370)
    at org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.handleAcceptedSocket(NioServerSocketPipelineSink.java:137)
    at org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.eventSunk(NioServerSocketPipelineSink.java:76)
    at org.jboss.netty.channel.Channels.write(Channels.java:632)
    at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:70)
    at com.alibaba.dubbo.remoting.transport.netty.NettyHandler.writeRequested(NettyHandler.java:99)
    at org.jboss.netty.channel.Channels.write(Channels.java:611)
    at org.jboss.netty.channel.Channels.write(Channels.java:578)
    at org.jboss.netty.channel.AbstractChannel.write(AbstractChannel.java:251)
    at com.alibaba.dubbo.remoting.transport.netty.NettyChannel.send(NettyChannel.java:98)
    ... 14 more

Add Comment