scalaでjava.nio.channles.selector練習
7月からの都合でjavaの練習を開始。
とりあえずhttpproxyでも作ってみようかと、はじめServerSocket+Threadで作り始めたのだが、java.nio.channels.Selectorを使ったノンブロッキングIO方面に流れて、更にscalaになった。
とりあえずnon blockingでシングルスレッドのechoサーバが動くところまでできた。
import scala.collection.JavaConverters._; import scala.collection.mutable.ArrayBuffer; import java.nio.channels; class ChannelContext(val id: Int) { private val writequeue=ArrayBuffer[Byte]() private var pos=0 def isEmpty=length==0 def length=writequeue.length-pos def enqueue(v:Byte){ writequeue.append(v) } def dequeue():Byte={ assert(pos < writequeue.length, "empty queue") val v=writequeue(pos) pos+=1 return v } def popback(size: Int){ assert(size <= pos) pos-=size } } object Server { val iobuf = java.nio.ByteBuffer.allocate(4096) def min(a:Int, b:Int):Int={ if(a <= b){ a } else{ b } } def start(port: Int){ val selector = channels.Selector.open(); val serverChannel = channels.ServerSocketChannel.open(); serverChannel.configureBlocking(false); serverChannel.socket().bind(new java.net.InetSocketAddress(port)); serverChannel.register(selector, channels.SelectionKey.OP_ACCEPT); println("start port=" + port); var id=1 var isEnd=false while(!isEnd){ // mode for(key <- selector.keys.asScala; context=key.attachment.asInstanceOf[ChannelContext]){ if(key.isValid() && context!=null){ if(context.isEmpty){ key.interestOps(channels.SelectionKey.OP_READ) } else{ key.interestOps(channels.SelectionKey.OP_READ | channels.SelectionKey.OP_WRITE) } } } // select val selected=selector.select() println("selected: "+selected) if(selected==0){ isEnd=true } else{ val keys=selector.selectedKeys(); for(key <- keys.asScala){ if (key.isAcceptable()) { // accept val listening = key.channel().asInstanceOf[channels.ServerSocketChannel]; val channel=listening.accept(); assert(channel!=null, "fail to accept"); channel.configureBlocking(false); val context=new ChannelContext(id); channel.register(selector, channels.SelectionKey.OP_READ, context); println("[%03d]accept".format(id)); id+=1 } else{ val context=key.attachment.asInstanceOf[ChannelContext]; val channel=key.channel.asInstanceOf[channels.SocketChannel]; if(key.isValid && key.isReadable) { // read println("[%03d]isReadable".format(context.id)) var isEnd=false; while(!isEnd){ iobuf.clear() channel.read(iobuf) match { case -1 => println("[%03d]closed".format(context.id)); isEnd=true key.cancel() case x => if(x < iobuf.capacity){ isEnd=true } iobuf.limit(x) iobuf.flip() while(iobuf.hasRemaining){ context.enqueue(iobuf.get()) } } } } if(key.isValid && key.isWritable){ // write println("[%03d]isWritable".format(context.id)) var isEnd=false; while(!context.isEmpty && !isEnd){ assert(context.length>0, "invalid context") iobuf.clear() val size=min(iobuf.capacity, context.length) iobuf.limit(size) for(i <- 0 until size){ iobuf.put(context.dequeue) } iobuf.flip() channel.write(iobuf) match { case -1 => println("closed: "+channel); isEnd=true key.cancel() case x => if(x < iobuf.limit){ isEnd=true context.popback(iobuf.limit-x) } } } } } } keys.clear(); // bad know how } } } } object Main extends App { Server.start(8080) }
javaとかscalaの錬度がいまいちであるというより、java.nioが罠だらけなのが問題だった。
特に、
Set<SelectionKey> java.nio.channels.Selector.selectedKeys()
の仕様がよろしくない。
acceptしたりread/writeしおわったchannelをSet
(上のコードだとbad know howと書いてあるところ)
さもないと、acceptしおわったServerSocketが再度Select状態で来て、acceptするとnullが返ってくるということになる。
わかり難いだけであまりメリット無いと思うのだが何故このようなAPIにしちゃったの。
ほかに、java.nio.ByteBufferのlimitとかposition, flipの概念を理解するのに時間がかかった。
こっちはデザインの問題なのでいいと思うが。
下回りはだいたいできたので、こいつをベースにhttp proxyを作ってみる。
scalaらしくactorとか使いたいのだが、どのような設計にすればよいものか。