Learning Scala

Spent a couple of hours playing with Scala. Very detailed type system, and (as a consequence) very complex syntax. But I was able to put together in a short time something that does a real job – a web crawler. The crawler can parallely fetch as many sites as you want. An actor takes care of each site. An ExecutorService in the actor takes care of each document. Pretty fast. Probably works for any crawl job that don’t have many corner cases. Has a dependency on the Jericho HTML Parser. The complete code is listed below. (Disclaimer: This is just a finger exercise in a language that I newly learned. Use at your own risk. Please don’t report bugs :–).)

// Compile: 
// > scalac -cp e:/jericho-html-3.1/dist/jericho-html-3.1.jar Crawler.scala
// Run:
// > scala -cp e:/jericho-html-3.1/dist/jericho-html-3.1.jar;. Crawler http://somesite1.com http://somesite2.com ...

import java.net._
import java.io._
import java.util.ArrayList
import java.util.concurrent._
import scala.actors.Actor

import net.htmlparser.jericho._

object CrawlState {
  private var crawledLinks = List[String]()

  def isCrawled(link: String): Boolean = {
    if (!(crawledLinks contains(link))) {
      crawledLinks = crawledLinks :+ link
      return false
    }
    return true
  }
}

class CrawlerJob(targetUrl: String) extends Actor {

 class FutureInfo(surl: String, sout: ByteArrayOutputStream) {
    def url(): String = {
      surl
    }
    def out(): ByteArrayOutputStream = {
      sout
    }
 }

 private var futures = new scala.collection.immutable.Stack[java.util.concurrent.Future[FutureInfo]]
 private val executor = Executors.newCachedThreadPool()

 class UrlFetcher(surl: String) extends Callable[FutureInfo] {
    def call(): FutureInfo = {
      println("fetching " + surl)
      val url = new URL(surl)
      val con: HttpURLConnection = url.openConnection.asInstanceOf[HttpURLConnection]
      val in = new BufferedInputStream(con getInputStream)
      val buffer = new Array[Byte](1024)
      var read = in.read(buffer)
      val out = new ByteArrayOutputStream()
      while (read != -1) {
        for (b <- buffer)
          out write(b)
        read = in.read(buffer)
      }
      in.close
      new FutureInfo(surl, out)
    }
  }

  def act() {
    crawl(targetUrl)
    while (futures.nonEmpty) {
      val future = futures.top
      futures = futures.pop
      try {
        val finfo = future.get(200, TimeUnit.MILLISECONDS)
        handle(finfo)
      } catch {
        case to_e: TimeoutException => futures = futures.push(future)
        case e: Exception => println(e getMessage)
      }
    }
  }

  def crawl(urlToCrawl: String) = {
    if (crawlable (urlToCrawl)) {  
  val surl = escapeSpaces(urlToCrawl)
  val future = executor.submit(new UrlFetcher(surl))
  futures = futures.push(future)
    }
 }

 def handle(finfo: FutureInfo) = {
    val surl = finfo url
    val out = finfo out
    val folderAndFile = extractFolderAndFile(surl)
    val isHtml = folderAndFile.last.contains(".htm")
    val file = createFile(folderAndFile)
    saveToFile(file, out)

    var links = new ArrayList[String]
    if (isHtml) {
      val source = new Source(new ByteArrayInputStream(out toByteArray))
      addLinks(source, HTMLElementName.A, links)
      addLinks(source, HTMLElementName.IMG, links)
    }

    val c = links size
    var i = 0
    while (i < c) {
      val link = links.get(i)
      if (link != null && !(CrawlState.isCrawled(link))) {
        var canCrawl = ((link startsWith(targetUrl)) || (link startsWith("."))
                        || (link startsWith("/")));
       if (!canCrawl) {
          canCrawl = !(link contains("://"))
       }
       if (canCrawl) {
           val tlink = linkify(removeFile(surl), link)   
           crawl(tlink)
       }
     }
     i = i + 1
   }
 }

 private def crawlable(url: String): Boolean = {
   !((url.contains("#") || url.contains("mailto:")
 || url.contains("<") || url.contains(">")
 || url.contains("=") || url.contains(".cgi")
 || url.contains(".php"))) // && so on ...
}

private def escapeSpaces(url: String): String = {
   url.replace(" ", "%20")
}

private def removeFile(url: String): String = {
   var idx = url.indexOf("://") + 3    
   if (idx > 0) {
   val prefix = url substring(0, idx)
   var suffix = url substring(idx)
   idx = suffix lastIndexOf("/")
   if (idx > 0) {
     suffix = suffix substring(0, idx)
   }
   return prefix + suffix
 }
 return url
}

private def linkify(targetUrl: String, link: String): String = {
  if ((link contains("://")))
    return link
  var mlink = link
  if ((link startsWith(".")))
    mlink = link substring(1)
  if (!((mlink startsWith("/"))))
    mlink = "/" + mlink
  return targetUrl + mlink
}

private def addLinks(source: Source, elemName: String,
                    linksList: ArrayList[String]) = {
   var links = source getAllElements(elemName)
   var count = links size
   var i = 0      
   var key = "href"
   if (elemName == HTMLElementName.IMG)
     key = "src"
   while (i < count) {
     linksList add(links get(i) getAttributeValue(key))
     i = i + 1
   }
}

private def extractFolderAndFile(surl: String): List[String] = {
   var file = "index.html"
   var start = (surl.indexOf("://")) + 3    
   var idx = surl.indexOf("/", start)
    if (idx < 0)
      return List(surl.substring(start).replace(":", "_"), file)
   var folder = surl.substring(start, idx)
   while (idx > 0) {
      start = idx
      idx = surl.indexOf("/", start + 1)
       if (idx > 0)
        folder = folder + surl.substring(start, idx)
   }

  if (((surl.length) - start) > 0) {
    file = surl substring(start + 1, surl.length)
  }
  if ((file length) == 0)
     file = "index.html"
  return List(folder.replace(":", "_"), file)
 }

 private def createFile(folderAndFile: List[String]): File = {
  val folder: File = new File(folderAndFile head)
  if (!folder.exists()) {
    folder.mkdirs()
  }
  return new File(folder + "/" + folderAndFile.last)
 }
 private def saveToFile(file: File, buffer: ByteArrayOutputStream) = {
   val fout = new FileOutputStream(file)
   fout.write(buffer toByteArray)
 }
}

object Crawler {
 def main(args: Array[String]) = {
   args foreach(targetUrl => {
    val crawler = new CrawlerJob(targetUrl)
    crawler start
  }) 
 }
}

Thread.interrupt()

The only sensible way to cancel an executing Java thread is by interrupting it. This will set a boolean interrupted status flag in the thread object to true. Calling interrupt on a thread does not mean that it will immediately stop. It just requests the thread to interrupt itself at the next blocking call. These blocking calls comprise methods such as sleep, wait and join. They are known as cancellation points. If the interrupted status is set, these methods clear that flag and throw an InterruptedException. What should a caller do if it catches an InterruptedException? The answer depends on the ownership of the thread. The object that launched the thread is considered its owner. For instance, a thread pool is the owner of all worker threads that it started. Only the owner knows what to do when a thread is interrupted. Code executed by the owner when a thread is interrupted is known as its interruption policy or cancellation policy. The owner will be denied the chance to execute this policy if the InterruptedException is not propagated up to it. To make this idea clear, let us look at the implementation of a simple TaskExecutor. This class receives a Task and executes it in a thread. TaskExecutor also exposes a method to cancel a running task. A task is represented by this simple interface:

public interface Task {
    public void execute () throws InterruptedException;
}

execute throws the InterruptedException so that implementers get a chance to propagate it to the TaskExecutor.

The TaskExecutor receives a task through its submit method. This task is passed on to a TaskThread where it is executed. The TaskExecutor object owns this thread. When it is interrupted, the TaskThread calls the appropriate method that executes the interruption policy as defined by TaskExecutor. An interruption policy may free up some thread specific resource, reset some state or do something else that only the owner knows of. (As we have only one thread in the TaskExecutor, the thread also calls a global cleanup method before it finish running. In the case of a thread pool, this method might be called before the last thread is interrupted or when it exists normally). Here is the code that defines the TaskThread class:

public class TaskThread extends Thread {

     public TaskThread (Task task, TaskExecutor owner) {
         this.task = task;
         this.owner = owner;
     }

     public void run () {
         try {
             task.execute ();
         } catch (InterruptedException ex) {
             if (owner != null)
                 owner.executeInterruptionPolicy ();
         } finally {
             if (owner != null) {
                 owner.cleanup ();
             }
         }
     }

     private Task task;
     private TaskExecutor owner;
 }

Now, the definition of TaskExecutor itself:

public class TaskExecutor {

    public void submit (Task task) {
        t = new TaskThread (task, this);
        t.start ();
        running = true;
    }

    public void executeInterruptionPolicy () {
        System.out.println ("executing interruption policy ...");
    }

    public void cleanup () {
        if (running) {
            System.out.println ("cleaning up ...");
            running = false;
        }
    }

    public void cancel () {
        if (running)
            t.interrupt ();
    }

    public boolean isRunning () {
        return running;
    }

    private TaskThread t;
    private boolean running;
}

The cancel method just calls interrupt on the running thread, so that, if the task makes a blocking call on the thread, it will throw an InterruptedException. As the Task implementation do not own the thread and cannot be sure of its interruption policy, it should not consume this exception. Here is a Task implementation that correctly hands over the InterruptedException to someone who knows what it means to interrupt this particular thread:

public class CounterTask implements Task {

    public void execute () throws InterruptedException {
        int i = 0;
        while (i < 10) {
            System.out.print (++i + " ");
            try {
                Thread.sleep (500);
            } catch (InterruptedException ex) {
                throw ex;
            }
        }
    }
}

The following code submits a CounterTask to the executor. User can cancel the task by typing the ‘q’ key:

public class Test {
    public static void main (String args[]) {
        TaskExecutor t = new TaskExecutor ();
        t.submit (new CounterTask ());
        while (t.isRunning ()) {
        try {
            int c = System.in.read ();
            if ((char)c == 'q') {
                t.cancel ();
                break;
            }
        } catch (java.io.IOException ex) { }
    }
}

A sample run:

$ java.exe Test
1 2 3 4 5 6 q
executing interruption policy ...
cleaning up ...

If you comment out throw ex; in CounterTask.execute(), TaskExecutor.executeInterruptionPolicy() will not run.

To summarize :– The proper way to cancel a running thread is to call interrupt on it. Its interrupted status can either be polled (by calling isInterrupted) or by catching the InterruptedException. Consume this exception only if you own the thread. Otherwise re-throw the exception and give the owner of the thread a chance to execute the proper interruption policy.

invokedynamic

JDK 7 features. The most notable one is the new invokedynamic instruction, which will give dynamic languages first class status on the JVM. People are patching up things here. A VM for an Object Oriented language should be built around dynamic typing and true message passing. Dynamic typing is important because good software engineering means exploratory programming and prototyping. Message passing should be preferred over method calls because it melds well with dynamic typing, concurrency and component distribution. Smalltalk and Erlang got these right from the very beginning.

JDK 7 also features automatic resource management. If you have programmed in Common Lisp, you know what that means:

(with-open-file (file "some.data" :direction :output)
      (format file "whew!!"))

Now you can do the same stuff in Java:

try (file = new FileOutputStream("some.dat")) {
       file.write(buf, 0, len);
} // file is automatically closed here.

The last great idea in language design is probably (at least) 35 years old. Glad to see the world’s leading ‘enterprise language’ catching up – slowly!

PS: Static type checking, if required, can be implemented in the language being implemented itself. EOPL contains the best explanation (I have seen) on this topic.

ABCL vs SISC

Did a small benchmark of two complete Lisp implementations for the JVM – ABCL (ANSI Common Lisp) and SISC (R5RS). When it comes to invoking Java objects, both perform equally well. But in number crunching that includes floating point arithmetic, SISC simply outperforms ABCL by a factor of 7. Probably type declarations and other optimizations might improve the performance of ABCL. But I didn’t try that. The code used for the test is listed below.

Test for ABCL:

;;;; test.lisp

(defun calc (i a)
    (* i a))

(defun test1 (x y)
    (let ((a 1.0))
        (loop for i from x to y do (setf a (calc i a)))
     a))

(setf jstring-class (jclass "java.lang.String"))
(setf jstring-cons (jconstructor jstring-class jstring-class))
(setf jstring-concat (jmethod jstring-class "concat" jstring-class))

(defun make-jstring (str) (jnew jstring-cons str))

(defun test2 (x y)
     (let ((a (make-jstring "hey")))
            (loop for i from x to y do 
               (setf a (jcall jstring-concat a 
                     (make-jstring "hello"))))
            t))

ABCL test run and results:

CL-USER(1): (load "test.lisp")
T
CL-USER(2): (time (test1 1 1000000))
13.295 seconds real time
8013706 cons cells    
CL-USER(3): (time (test2 1 10000))
1.626 seconds real time
80170 cons cells

Test for SISC:

;;;; test.ss

 (import s2j)

 (define (calc i a) 
    (* i a))

 (define (test1 x y)  
      (let loop ((i x) (a 1.0))
          (if (< i y)
             (loop (add1 i) (calc i a))
       a)))

(define jstring-class (java-class '|java.lang.String|))
(define jstring-concat (generic-java-method '|concat|))

(define (make-string str) 
    (java-new jstring-class (->jstring str)))

(define (test2 x y)
    (let loop ((i x) (a (make-string "hey")))
         (if (< i y)
             (loop (add1 i) (jstring-concat a 
                     (make-string "hello")))
         #t)))

SISC test run and results:

> (load "test.ss")
> (time (test1 1 1000000))
(1407 ms)
> (time (test2 1 10000))
(1719 ms)

Better synchronization in Java

A few tips for better thread synchronization in Java from Java Concurrency in Practice:

1. Narrow lock scope

Lock only the portion of code that really needs to be locked. This can dramatically reduce the time threads wait to acquire the lock. Consider the following method:

public synchronized boolean addItem(Item item) {
    if (validate(item)) {
         return itemsArray.add(item);
    }
    return false;
}

The method level lock prevents two threads from concurrently running validate(item), which need not be synchronized. To get the full benefit of multi-threading we change the method like this:

public boolean addItem(Item item) {
    if (validate(item)) {
     synchronized(this) {
             return itemsArray.add(item);
         }
    }
return false;
}

2. Reduce lock granularity

If there are two objects whose concurrent modification are sure not to create havoc, synchronize them with dedicated locks. For instance, here the thread that adds an Item will have to wait until the thread that adds an Order is finished :

public boolean addItem(Item item) {
    if (validate(item)) {
     synchronized(this) {
             return itemsArray.add(item);
         }
    }
return false;
}

public boolean addOrder(Order order) {
    if (validate(order)) {
     synchronized(this) {
             return ordersArray.add(order);
         }
    }
return false;
}

The performance can be improved by increasing the granularity of the locks:

private Object itemLock = new Object();
private Object orderLock = new Object();

public boolean addItem(Item item) {
    if (validate(item)) {
     synchronized(itemLock) {
             return itemsArray.add(item);
         }
    }
return false;
}

public boolean addOrder(Order order) {
    if (validate(order)) {
     synchronized(orderLock) {
             return ordersArray.add(order);
         }
    }
return false;
}

3. Strip locks, if possible

A single collection can be partition locked, like we see in the implementation of this Map:

public class HighlyResponsiveMap {
     private static final int N_LOCKS = 16;
     private final Object locks[] = new Object[N_LOCKS];
 private final Node buckets[];

 public HighlyResponsiveMap(int numBuckets) {
     buckets = new Node[numBuckets];
     for (int i = 0; i < N_LOCKS; ++i) {
         locks[i] = new Object();
         }
     }

 public Object get(Object key) {
      int hash = key.hashCode();
      synchronized(locks[hash % N_LOCKS]) {
          // Find node from the bucket and return its value.
          }
          return null;
 }
}

4. Avoid object pools

In days of yore, when allocating memory for a new Object was really slow, people used to depend on object pools. These days creating a new object in Java often outperforms C’s malloc. Synchronizing an object pool can be expensive than creating a new Object when needed. If you are using Java 5 or later, avoid object pools.

Value semantics

“Value semantics” is used to refer to classes whose objects when copied gives two independent copies, with the same value. C++ has value semantics for both built-in and user-defined types. This is in accordance with one of C++’s core design principles – “support user-defined and built-in types equally well”. For example:

int a = 10;
int b = a; // Now we have two ints with the same value.
++b; // Changes the value of `b' to 11, but `a' still remains 10.

// A user-defined type that represents a 64-bit integer.
class Int64
{
     // Supports value semantics by overloading the
     //  copy constructor and the = operator. 
     Int64(const Int64& val);
     Int64& operator=(const Int64& val);
};

Int64 a = 10;
Int64 b = a; // Two objects of Int64 with the same value.
++b; // Only b changes, a remains the same.

Java has value semantics for primitive types like int and char. But user-defined types have “reference semantics”:

class Int64 {
    // ...
}

Int64 a = new Int64(10);
Int64 b = a; // a and b points to the same object.
b.increment(); // a and b is changed.

This is often a source of confusion to newcomers.

typedefs are good

The designers of Java omitted typedefs on the ground that it introduces “too much context”. This is true to some extent. I mostly depend on typedefs for declaring generic containers. This saves a lot of keystrokes and screen space, especially when declaring functions that consume or return such containers:

typedef map<string, int> NameAgeMap;

static void InitSample(NameAgeMap& sample)
{
   sample["joe"] = 20;
   sample["mark"] = 22;
}

// ....

NameAgeMap sample;
InitSample(sample);
NameAgeMap::const_iterator iter = sample.begin();
while (iter != sample.end())
{
   std::cout << iter->first << ':' << iter->second << '\n';
   ++iter;
}

If the new type is properly named, we need not look at the typedef to infer what it really is. An IDE might even show the real type when the mouse is hovered over the variable. Contrast this with Java. I have to type the lengthy container declaration where ever I need it:

public void InitSample(HashMap<String, Integer> sample) { 
    sample.put("joe", 20);
    sample.put("mark", 22);
}

// ...

HashMap<String, Integer>sample = new HashMap<String, Integer>();
InitSample(sample);
Set<String>keys = sample.keySet();
for (String name : keys) {
     System.out.println(name + ':' + sample.get(name));
}

I suppose that the argument against typedef was made before Java had generics. As the view on enum was later compromised, Java may eventually introduce typedef.

Why use private locks in Java?

What is wrong with the following code?

public class Counter {
    public synchronized int getNext() {
        return ++value;
    }
    private int value;
}

It is correctly synchronized, but can cause a liveness problem if some code obtains a lock on an instance of Counter:

public class Test {
    public static void main(String args[]) throws Exception {
        Counter c = new Counter();
        CounterThread ct = new CounterThread(c);
        // This will cause c.next() to wait for the lock on c to be
        // released, which will never happen. 
        // The program will hang here.
        synchronized (c) {
            ct.start();
            ct.join();
        }
    }
    public static class CounterThread extends Thread {
        public CounterThread(Counter counter) {
            this.counter = counter;
        }
        public void run() {
            System.out.println(counter.next());
        }
        private Counter counter;
    }
}

The solution is to use a private lock object rather than using Counter’s intrinsic lock. Make sure that this lock is not published:

public class Counter {
    public int next() {
        synchronized (lock) {
            return ++value;
        }
    }
    private int value;
    private Object lock = new Object();
}

Java concurrency quirks

Java concurrency is surprisingly difficult to get right. Some operations that look atomic can be especially deceptive. For instance, look at the following simple class. It has only one method which does an operation that looks atomic:

public class UniqueNumber {
    // Returns a unique integer value.
    public int get() {
        return ++val;
    }
    private int val;
}

The ++ operator applied to an integer might look like an atomic operation but it can lead to incorrect results if an object of UniqueNumber is shared by multiple threads. Why? Because ++ is not really atomic as we see from the following byte code dump:

> javap -c UniqueNumber
........
........
public int get();
Code:
0: aload_0
1: dup
2: getfield #2; //Field val:I
5: iconst_1
6: iadd
7: dup_x1
8: putfield #2; //Field val:I
11: ireturn

++ gets expanded to VM operations that loads the variable, adds 1 to it and saves the new value back. Now imagine two threads simultaneously calling get() on a UniqueNumber object. (The opcodes executed is shown as comma separated numbers with reference to the above byte code listing and the current stack is shown in square brackets):

Thread1 => 0,1,2 [0]
Thread2 => 0,1,2 [0], 5 [0 1], 6 [1]
Thread1 => 5 [0 1], 6 [1]
.... and so on ....

In the end both threads end up fetching the value 1, which is not what the UniqueNumber class was expected to produce!

Even fetching values directly from “primitive” types can be non-atomic operations at the VM level. The is true for 64-bit types like long and double as a Java implementation (especially on a 32 bit arch) can choose to implement those types using two 32 bit values. Then reading and writing a long/double will get expanded to two VM opcodes. If primitive types like boolean, int, long and double are shared between threads (even if only one thread does all the writing) declare them as volatile.