scalaでjava.nio.channles.selector練習

7月からの都合でjavaの練習を開始。
とりあえずhttpproxyでも作ってみようかと、はじめServerSocket+Threadで作り始めたのだが、java.nio.channels.Selectorを使ったノンブロッキングIO方面に流れて、更にscalaになった。

とりあえずnon blockingでシングルスレッドのechoサーバが動くところまでできた。

import scala.collection.JavaConverters._;
import scala.collection.mutable.ArrayBuffer;
import java.nio.channels;


class ChannelContext(val id: Int)
{
  private val writequeue=ArrayBuffer[Byte]()
  private var pos=0
  def isEmpty=length==0
  def length=writequeue.length-pos
  def enqueue(v:Byte){ writequeue.append(v) }
  def dequeue():Byte={
    assert(pos < writequeue.length, "empty queue")
    val v=writequeue(pos)
    pos+=1
    return v
  }
  def popback(size: Int){ 
    assert(size <= pos)
    pos-=size
  }
}


object Server {

  val iobuf = java.nio.ByteBuffer.allocate(4096)

  def min(a:Int, b:Int):Int={
    if(a <= b){
      a
    }
    else{
      b
    }
  }

  def start(port: Int){
    val selector = channels.Selector.open();
    val serverChannel = channels.ServerSocketChannel.open();
    serverChannel.configureBlocking(false);
    serverChannel.socket().bind(new java.net.InetSocketAddress(port));
    serverChannel.register(selector, channels.SelectionKey.OP_ACCEPT);
    println("start port=" + port);

    var id=1
    var isEnd=false
    while(!isEnd){
      // mode
      for(key <- selector.keys.asScala; context=key.attachment.asInstanceOf[ChannelContext]){
        if(key.isValid() && context!=null){
          if(context.isEmpty){
            key.interestOps(channels.SelectionKey.OP_READ)
          }
          else{
            key.interestOps(channels.SelectionKey.OP_READ | channels.SelectionKey.OP_WRITE)
          }
        }
      }

      // select
      val selected=selector.select()
      println("selected: "+selected)
      if(selected==0){
        isEnd=true
      }
      else{
        val keys=selector.selectedKeys();
        for(key <- keys.asScala){
          if (key.isAcceptable()) {
            // accept
            val listening = key.channel().asInstanceOf[channels.ServerSocketChannel];
            val channel=listening.accept();
            assert(channel!=null, "fail to accept");
            channel.configureBlocking(false);
            val context=new ChannelContext(id);
            channel.register(selector,
              channels.SelectionKey.OP_READ,
              context);
            println("[%03d]accept".format(id));
            id+=1
          }
          else{
            val context=key.attachment.asInstanceOf[ChannelContext];
            val channel=key.channel.asInstanceOf[channels.SocketChannel];
            if(key.isValid && key.isReadable) {
              // read
              println("[%03d]isReadable".format(context.id))
              var isEnd=false;
              while(!isEnd){
                iobuf.clear()
                channel.read(iobuf) match {
                  case -1 =>
                  println("[%03d]closed".format(context.id));
                  isEnd=true
                  key.cancel()
                  case x =>
                  if(x < iobuf.capacity){
                    isEnd=true
                  }
                  iobuf.limit(x)
                  iobuf.flip()
                  while(iobuf.hasRemaining){
                    context.enqueue(iobuf.get())
                  }
                }
              }
            }
            if(key.isValid && key.isWritable){
              // write
              println("[%03d]isWritable".format(context.id))
              var isEnd=false;
              while(!context.isEmpty && !isEnd){
                assert(context.length>0, "invalid context")
                iobuf.clear()
                val size=min(iobuf.capacity, context.length)
                iobuf.limit(size)
                for(i <- 0 until size){
                  iobuf.put(context.dequeue)
                }
                iobuf.flip()
                channel.write(iobuf) match {
                  case -1 =>
                  println("closed: "+channel);
                  isEnd=true
                  key.cancel()
                  case x =>
                  if(x < iobuf.limit){
                    isEnd=true
                    context.popback(iobuf.limit-x)
                  }
                }
              }
            }
          }
        }
        keys.clear(); // bad know how
      }
    }
  }
}

object Main extends App {
  Server.start(8080)
}

javaとかscalaの錬度がいまいちであるというより、java.nioが罠だらけなのが問題だった。
特に、

Set<SelectionKey> java.nio.channels.Selector.selectedKeys() 

の仕様がよろしくない。
acceptしたりread/writeしおわったchannelをSetからわざわざ取り除かなくてはいけない。
(上のコードだとbad know howと書いてあるところ)
さもないと、acceptしおわったServerSocketが再度Select状態で来て、acceptするとnullが返ってくるということになる。
わかり難いだけであまりメリット無いと思うのだが何故このようなAPIにしちゃったの。


ほかに、java.nio.ByteBufferのlimitとかposition, flipの概念を理解するのに時間がかかった。
こっちはデザインの問題なのでいいと思うが。


下回りはだいたいできたので、こいつをベースにhttp proxyを作ってみる。
scalaらしくactorとか使いたいのだが、どのような設計にすればよいものか。