`

简单的Mina的UDP实现

阅读更多
根据Mina的官方例子修改而来。

server端:
/*
*  Licensed to the Apache Software Foundation (ASF) under one
*  or more contributor license agreements.  See the NOTICE file
*  distributed with this work for additional information
*  regarding copyright ownership.  The ASF licenses this file
*  to you under the Apache License, Version 2.0 (the
*  "License"); you may not use this file except in compliance
*  with the License.  You may obtain a copy of the License at
*
*    http://www.apache.org/licenses/LICENSE-2.0
*
*  Unless required by applicable law or agreed to in writing,
*  software distributed under the License is distributed on an
*  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
*  KIND, either express or implied.  See the License for the
*  specific language governing permissions and limitations
*  under the License.
*
*/ 
package com.taobao.mina.myudp; 
 
import java.awt.BorderLayout; 
import java.awt.Dimension; 
import java.io.IOException; 
import java.net.InetSocketAddress; 
import java.net.SocketAddress; 
import java.util.concurrent.ConcurrentHashMap; 
 
import javax.swing.JFrame; 
import javax.swing.JLabel; 
import javax.swing.JPanel; 
import javax.swing.JTabbedPane; 
 
import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder; 
import org.apache.mina.filter.logging.LoggingFilter; 
import org.apache.mina.transport.socket.DatagramSessionConfig; 
import org.apache.mina.transport.socket.nio.NioDatagramAcceptor; 
 
/**
* The class that will accept and process clients in order to properly
* track the memory usage.
*
* @author <a href="http://mina.apache.org" mce_href="http://mina.apache.org">Apache MINA Project</a>
*/ 
public class MemoryMonitor { 
 
    private static final long serialVersionUID = 1L; 
 
    public static final int PORT = 18567; 
 
    public MemoryMonitor() throws IOException { 
        // 创建UDP数据包NIO 
        NioDatagramAcceptor acceptor = new NioDatagramAcceptor(); 
        // NIO设置底层IOHandler 把服务器的本身传入 
        acceptor.setHandler(new MemoryMonitorHandler(this)); 
 
        // 设置filter 
        DefaultIoFilterChainBuilder chain = acceptor.getFilterChain(); 
        chain.addLast("logger", new LoggingFilter()); 
 
        // 设置是否重用地址? 也就是每个发过来的udp信息都是一个地址? 
        DatagramSessionConfig dcfg = acceptor.getSessionConfig(); 
        dcfg.setReuseAddress(true); 
 
        // 绑定端口地址 
        acceptor.bind(new InetSocketAddress(PORT)); 
        System.out.println("UDPServer listening on port " + PORT); 
    } 
     
    public static void main(String[] args) { 
        try { 
            new MemoryMonitor(); 
        } catch (IOException e) { 
            e.printStackTrace(); 
        } 
    } 

/*
*  Licensed to the Apache Software Foundation (ASF) under one
*  or more contributor license agreements.  See the NOTICE file
*  distributed with this work for additional information
*  regarding copyright ownership.  The ASF licenses this file
*  to you under the Apache License, Version 2.0 (the
*  "License"); you may not use this file except in compliance
*  with the License.  You may obtain a copy of the License at
*
*    http://www.apache.org/licenses/LICENSE-2.0
*
*  Unless required by applicable law or agreed to in writing,
*  software distributed under the License is distributed on an
*  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
*  KIND, either express or implied.  See the License for the
*  specific language governing permissions and limitations
*  under the License.
*
*/ 
package com.taobao.mina.myudp; 
 
import java.nio.charset.Charset; 
import java.nio.charset.CharsetEncoder; 
import java.util.Date; 
 
import org.apache.mina.core.buffer.IoBuffer; 
import org.apache.mina.core.service.IoHandlerAdapter; 
import org.apache.mina.core.session.IdleStatus; 
import org.apache.mina.core.session.IoSession; 
 
/**
* Class the extends IoHandlerAdapter in order to properly handle
* connections and the data the connections send
*
* @author <a href="http://mina.apache.org" mce_href="http://mina.apache.org">Apache MINA Project</a>
*/ 
public class MemoryMonitorHandler extends IoHandlerAdapter { 
 
    private MemoryMonitor server; 
 
    public MemoryMonitorHandler(MemoryMonitor server) { 
        this.server = server; 
    } 
 
    /**
     * 异常来关闭session
     */ 
    @Override 
    public void exceptionCaught(IoSession session, Throwable cause) 
            throws Exception { 
        cause.printStackTrace(); 
        session.close(true); 
    } 
 
    /**
     * 服务器端收到一个消息
     */ 
    @Override 
    public void messageReceived(IoSession session, Object message) 
            throws Exception { 
 
        if (message instanceof IoBuffer) { 
            IoBuffer buffer = (IoBuffer) message; 
            buffer.setAutoExpand(true); 
            System.out.println("服务器端获得udp信息:" + buffer.getLong()); 
            Charset c = Charset.forName("UTF-8");           
            CharsetEncoder ce = c.newEncoder(); 
             
            // 给client返回信息 IoBuffer.wrap 
            IoBuffer clientBuffer = IoBuffer.wrap((new Date().toLocaleString() + "服务器已收到。").getBytes(c)); 
            clientBuffer.setAutoExpand(true); 
            session.setAttribute("clientbuffer", clientBuffer); 
            session.write(clientBuffer); 
        } 
    } 
 
    @Override 
    public void sessionClosed(IoSession session) throws Exception { 
        System.out.println("服务器端关闭session..."); 
    } 
 
    @Override 
    public void sessionCreated(IoSession session) throws Exception { 
        System.out.println("服务器端成功创建一个session..."); 
    } 
 
    @Override 
    public void sessionIdle(IoSession session, IdleStatus status) 
            throws Exception { 
       //  System.out.println("Session idle..."); 
    } 
 
    @Override 
    public void sessionOpened(IoSession session) throws Exception { 
        System.out.println("服务器端成功开启一个session..."); 
    } 

client端:
/*
*  Licensed to the Apache Software Foundation (ASF) under one
*  or more contributor license agreements.  See the NOTICE file
*  distributed with this work for additional information
*  regarding copyright ownership.  The ASF licenses this file
*  to you under the Apache License, Version 2.0 (the
*  "License"); you may not use this file except in compliance
*  with the License.  You may obtain a copy of the License at
*
*    http://www.apache.org/licenses/LICENSE-2.0
*
*  Unless required by applicable law or agreed to in writing,
*  software distributed under the License is distributed on an
*  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
*  KIND, either express or implied.  See the License for the
*  specific language governing permissions and limitations
*  under the License.
*
*/ 
package com.taobao.mina.myudp.client; 
 
import java.net.InetSocketAddress; 
import java.nio.charset.Charset; 
import java.nio.charset.CharsetDecoder; 
 
import org.apache.mina.core.buffer.IoBuffer; 
import org.apache.mina.core.future.ConnectFuture; 
import org.apache.mina.core.future.IoFutureListener; 
import org.apache.mina.core.service.IoConnector; 
import org.apache.mina.core.service.IoHandlerAdapter; 
import org.apache.mina.core.session.IdleStatus; 
import org.apache.mina.core.session.IoSession; 
import org.apache.mina.example.udp.MemoryMonitor; 
import org.apache.mina.transport.socket.nio.NioDatagramConnector; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
 
/**
* Sends its memory usage to the MemoryMonitor server.
* 这样的写法  将本来可以单独的client写到IoHandlerAdapter 更紧凑
*
* @author <a href="http://mina.apache.org" mce_href="http://mina.apache.org">Apache MINA Project</a>
*/ 
public class MemMonClient extends IoHandlerAdapter { 
 
    private final static Logger LOGGER = LoggerFactory.getLogger(MemMonClient.class); 
 
    private IoSession session; 
 
    private IoConnector connector; 
    /**
     * Default constructor.
     */ 
    public MemMonClient() { 
        connector = new NioDatagramConnector(); 
        connector.setHandler(this); 
        ConnectFuture connFuture = connector.connect(new InetSocketAddress( 
                "localhost", MemoryMonitor.PORT));  // 这样不太好吧 
        connFuture.awaitUninterruptibly(); 
        // 给conn添加一个监听器 
        connFuture.addListener(new IoFutureListener<ConnectFuture>() { 
            public void operationComplete(ConnectFuture future) { 
                if (future.isConnected()) { 
                    session = future.getSession(); 
                    try { 
                        sendData(); 
                    } catch (InterruptedException e) { 
                        e.printStackTrace(); 
                    } 
                } else { 
                    try { 
                        throw new Exception(" 连接错误。 "); 
                    } catch (Exception e) { 
                        e.printStackTrace(); 
                    } 
                } 
            } 
        }); 
    } 
 
    private void sendData() throws InterruptedException { 
        for (int i = 0; i < 10; i++) { 
            long free = Runtime.getRuntime().freeMemory(); // 得到当前空闲内存大小 
            IoBuffer buffer = IoBuffer.allocate(8); 
            buffer.putLong(free);     // 只把剩余内存大小放入buffer, 扔给server           
            buffer.flip(); 
            session.write(buffer);    // 写入 
 
            try { 
                Thread.sleep(1000); 
            } catch (InterruptedException e) { 
                e.printStackTrace(); 
                throw new InterruptedException(e.getMessage()); 
            } 
        } 
    } 
 
    @Override 
    public void exceptionCaught(IoSession session, Throwable cause) 
            throws Exception { 
        cause.printStackTrace(); 
    } 
 
    @Override 
    public void messageReceived(IoSession session, Object message) 
            throws Exception { 
        Charset c = Charset.forName("UTF-8"); 
        CharsetDecoder cd = c.newDecoder(); 
        IoBuffer buffer = (IoBuffer)message; 
        System.out.println("客户端收到来自服务器的消息String:" + (buffer.getString(cd))); 
    } 
 
    @Override 
    public void messageSent(IoSession session, Object message) throws Exception { 
        System.out.println("客户端向服务器发送信息:" + ((IoBuffer)message).getLong()); 
    } 
 
    @Override 
    public void sessionClosed(IoSession session) throws Exception { 
        System.out.println("客户端关闭了当前会话"); 
    } 
 
    @Override 
    public void sessionCreated(IoSession session) throws Exception { 
        System.out.println("客户端成功创建session"); 
    } 
 
    @Override 
    public void sessionIdle(IoSession session, IdleStatus status) 
            throws Exception { 
    } 
 
    @Override 
    public void sessionOpened(IoSession session) throws Exception { 
        System.out.println("客户端成功开启一个session id:"+session.getId()); 
    } 
 
    public static void main(String[] args) { 
        new MemMonClient(); 
    } 
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics