Smalltalk India Meetup

Gave a short talk at the Smalltalk India Meetup. It was a live presentation of “concurrent objects” in Spark-Scheme:

Concurrent objects are represented as closures. All computations take place by message-passing. There is no shared state and as a consequence you are free from all problems associated with explicit locking (or lack of it). If you are familiar with Erlang, you know what I am talking about. Here is the definition of a tiny object that compute areas of various geometric objects:

(import (match)) ;; for pattern matching messages

(define pi 3.14159)

(define (area-server self) ;; self is a unique integer that 
                           ;; identifies this object within the system.
   (let loop ((message (receive self)))
     (unless (eq? message 'exit)
            (match message
                   (('circle r)
                     (printf "area of circle = ~a~n"  (* pi (* r r))))
                   (('rectangle h w)
                     (printf "area of rectange = ~a~n"  (* h w)))
                    (_ (printf "unknown message - ~a~n" message)))
              (flush-output) ;; We need this because the server
                             ;; is running in its own process.
              (loop (receive self)))))

A new instance of area-server is created by calling the spawn function:

> (spawn area-server)
=> 1

The integer id returned by spawn is used to send messages to the concurrent object:

> (send 1 '(circle 10))
 => area of circle = 314.159

As the number of objects in the system grows, it might become hard to keep track of the object ids. It is convenient to map the id to a name and use that name in send and receive:

> (register 2 "area-server")
> (send "area-server" '(rectangle 3 4))
=> area of rectange = 12

The object prints out the result. Let us see how we can make it to actually return the result. We need some changes to the protocol so that the object receives the id of the client that made the request. It will use this id to send back the result to that particular client. Here is our modified area-server:

(define (area-server self)
  (let loop ((message (receive self)))
    (unless (eq? message 'exit)
            (match message
                   (('circle r client-pid)
                    (send client-pid (* pi (* r r))))
                   (('rectangle h w client-pid)
                    (send client-pid (* h w)))
                   (_ (printf "unknown message - ~a~n" message)))
            (flush-output)
            (loop (receive self)))))

Let us define a client to test our new server:

(define (area-client self)
    (send "area-server" (list 'circle 20 self)) ;; Note that we append 
                                                ;; our id to the message.
    (printf "result = ~a~n" (receive self)) ;; area-server sends back 
                                            ;; the result and we print it.
    (flush-output))

Make sure that everything works fine:

> (spawn area-server)
=> 3
> (register 3 "area-server")
> (spawn area-client)
=> 4 ;; id of client
=> result = 1256.636 ;; result received from area-server.

To make it more useful, let us wrap our client in another function:

(define (spawn-area-client server-id message)
  (spawn
    (lambda (self)
      (register self "area-client")
      (send server-id (append message (list self)))
      (printf "result = ~a~n" (receive self))
      (flush-output))))

Now it is easier to test various message patterns:

> (spawn-area-client "area-server" (list 'circle 3.4))
=>10
=> result = 36.31678039999999
> (spawn-area-client "area-server" (list 'rectangle 4.5 6.7))
=> 11
=> result = 30.150000000000002

Concurrent objects are location agnostic. You can develop and test components in a single VM and later deploy them across a network, with little ceremony. To enable the area-server to receive messages from the network, you just need to start the remoting service:

> (remoting!)

The client should append its location to its name:

(send server-id (append message (list "area-client@node2")))

That’s all we need to make objects at different locations communicate with each other:

> (remoting!)
> (spawn-area-client "area-server@node1" (list 'circle 10.56))
=> 1
=> result = 350.330010624

Before we conclude, a word about fault tolerance. A process can exit the message loop on its own or it could get killed if the VM sends it the kill signal. We can assign a process to watch other processes (or concurrent objects). When an object dies or gets killed, all its watchers are notified. Here is a watcher process that restarts area-server whenever it dies as a result of a kill signal:

(define (watcher pid proc proc-name)
  (spawn
   (lambda (watcher-id)
     (watch pid watcher-id) ;; a watcher can watch any number of processes.
     (let loop ((message (receive watcher-id)))
         (if (eq? (car message) 'killed)
            (let ((new-id (spawn proc)))
              (register new-id proc-name)
              (watcher new-id proc proc-name)))))))

The watcher is assigned to watch the running area-server:

> (watcher 3 area-server "area-server")

3 is the id of area-server process to watch. If that process gets killed, the watcher process will restart it and bind it to the name “area-server”. We can kill a running process by calling the kill function:

> (kill 3)

Now if you go to the client node (node2) and send a message, you will still get the result from the new process spawned by the watcher. The re-spawning happens quite quickly as the processes are very lightweight objects living in the VM itself. We can run tens-of-thousands of such concurrent objects efficiently in a single instance of Spark.

Well, that’s all there is about concurrent objects!!

PS: If you are wondering why I was allowed to talk about a Lisp implementation at a Smalltalk event, there could be many reasons for that:

  • Lisp is the grand-daddy of all dynamic languages
  • Message-passing is of great interest to Smalltalkers
  • Alan Kay is the biggest fan of Lisp!

Looking forward for the next meetup of the Smalltalk India Group. Found some guys there who are really passionate about what they are doing in this amazing language!

Learning Scala

Spent a couple of hours playing with Scala. Very detailed type system, and (as a consequence) very complex syntax. But I was able to put together in a short time something that does a real job – a web crawler. The crawler can parallely fetch as many sites as you want. An actor takes care of each site. An ExecutorService in the actor takes care of each document. Pretty fast. Probably works for any crawl job that don’t have many corner cases. Has a dependency on the Jericho HTML Parser. The complete code is listed below. (Disclaimer: This is just a finger exercise in a language that I newly learned. Use at your own risk. Please don’t report bugs :–).)

// Compile: 
// > scalac -cp e:/jericho-html-3.1/dist/jericho-html-3.1.jar Crawler.scala
// Run:
// > scala -cp e:/jericho-html-3.1/dist/jericho-html-3.1.jar;. Crawler http://somesite1.com http://somesite2.com ...

import java.net._
import java.io._
import java.util.ArrayList
import java.util.concurrent._
import scala.actors.Actor

import net.htmlparser.jericho._

object CrawlState {
  private var crawledLinks = List[String]()

  def isCrawled(link: String): Boolean = {
    if (!(crawledLinks contains(link))) {
      crawledLinks = crawledLinks :+ link
      return false
    }
    return true
  }
}

class CrawlerJob(targetUrl: String) extends Actor {

 class FutureInfo(surl: String, sout: ByteArrayOutputStream) {
    def url(): String = {
      surl
    }
    def out(): ByteArrayOutputStream = {
      sout
    }
 }

 private var futures = new scala.collection.immutable.Stack[java.util.concurrent.Future[FutureInfo]]
 private val executor = Executors.newCachedThreadPool()

 class UrlFetcher(surl: String) extends Callable[FutureInfo] {
    def call(): FutureInfo = {
      println("fetching " + surl)
      val url = new URL(surl)
      val con: HttpURLConnection = url.openConnection.asInstanceOf[HttpURLConnection]
      val in = new BufferedInputStream(con getInputStream)
      val buffer = new Array[Byte](1024)
      var read = in.read(buffer)
      val out = new ByteArrayOutputStream()
      while (read != -1) {
        for (b <- buffer)
          out write(b)
        read = in.read(buffer)
      }
      in.close
      new FutureInfo(surl, out)
    }
  }

  def act() {
    crawl(targetUrl)
    while (futures.nonEmpty) {
      val future = futures.top
      futures = futures.pop
      try {
        val finfo = future.get(200, TimeUnit.MILLISECONDS)
        handle(finfo)
      } catch {
        case to_e: TimeoutException => futures = futures.push(future)
        case e: Exception => println(e getMessage)
      }
    }
  }

  def crawl(urlToCrawl: String) = {
    if (crawlable (urlToCrawl)) {  
  val surl = escapeSpaces(urlToCrawl)
  val future = executor.submit(new UrlFetcher(surl))
  futures = futures.push(future)
    }
 }

 def handle(finfo: FutureInfo) = {
    val surl = finfo url
    val out = finfo out
    val folderAndFile = extractFolderAndFile(surl)
    val isHtml = folderAndFile.last.contains(".htm")
    val file = createFile(folderAndFile)
    saveToFile(file, out)

    var links = new ArrayList[String]
    if (isHtml) {
      val source = new Source(new ByteArrayInputStream(out toByteArray))
      addLinks(source, HTMLElementName.A, links)
      addLinks(source, HTMLElementName.IMG, links)
    }

    val c = links size
    var i = 0
    while (i < c) {
      val link = links.get(i)
      if (link != null && !(CrawlState.isCrawled(link))) {
        var canCrawl = ((link startsWith(targetUrl)) || (link startsWith("."))
                        || (link startsWith("/")));
       if (!canCrawl) {
          canCrawl = !(link contains("://"))
       }
       if (canCrawl) {
           val tlink = linkify(removeFile(surl), link)   
           crawl(tlink)
       }
     }
     i = i + 1
   }
 }

 private def crawlable(url: String): Boolean = {
   !((url.contains("#") || url.contains("mailto:")
 || url.contains("<") || url.contains(">")
 || url.contains("=") || url.contains(".cgi")
 || url.contains(".php"))) // && so on ...
}

private def escapeSpaces(url: String): String = {
   url.replace(" ", "%20")
}

private def removeFile(url: String): String = {
   var idx = url.indexOf("://") + 3    
   if (idx > 0) {
   val prefix = url substring(0, idx)
   var suffix = url substring(idx)
   idx = suffix lastIndexOf("/")
   if (idx > 0) {
     suffix = suffix substring(0, idx)
   }
   return prefix + suffix
 }
 return url
}

private def linkify(targetUrl: String, link: String): String = {
  if ((link contains("://")))
    return link
  var mlink = link
  if ((link startsWith(".")))
    mlink = link substring(1)
  if (!((mlink startsWith("/"))))
    mlink = "/" + mlink
  return targetUrl + mlink
}

private def addLinks(source: Source, elemName: String,
                    linksList: ArrayList[String]) = {
   var links = source getAllElements(elemName)
   var count = links size
   var i = 0      
   var key = "href"
   if (elemName == HTMLElementName.IMG)
     key = "src"
   while (i < count) {
     linksList add(links get(i) getAttributeValue(key))
     i = i + 1
   }
}

private def extractFolderAndFile(surl: String): List[String] = {
   var file = "index.html"
   var start = (surl.indexOf("://")) + 3    
   var idx = surl.indexOf("/", start)
    if (idx < 0)
      return List(surl.substring(start).replace(":", "_"), file)
   var folder = surl.substring(start, idx)
   while (idx > 0) {
      start = idx
      idx = surl.indexOf("/", start + 1)
       if (idx > 0)
        folder = folder + surl.substring(start, idx)
   }

  if (((surl.length) - start) > 0) {
    file = surl substring(start + 1, surl.length)
  }
  if ((file length) == 0)
     file = "index.html"
  return List(folder.replace(":", "_"), file)
 }

 private def createFile(folderAndFile: List[String]): File = {
  val folder: File = new File(folderAndFile head)
  if (!folder.exists()) {
    folder.mkdirs()
  }
  return new File(folder + "/" + folderAndFile.last)
 }
 private def saveToFile(file: File, buffer: ByteArrayOutputStream) = {
   val fout = new FileOutputStream(file)
   fout.write(buffer toByteArray)
 }
}

object Crawler {
 def main(args: Array[String]) = {
   args foreach(targetUrl => {
    val crawler = new CrawlerJob(targetUrl)
    crawler start
  }) 
 }
}

Alan Cooper's apology

Have you ever cursed VB? Alan Cooper, the “Father of Visual Basic”, offers his apology here. In fact he is not be blamed for Visual Basic, the language. What he created was the visual designer part, which was later bought by Microsoft. This tool was originally called “Tripod”. It pioneered the idea of the software IDE with widgets palette and drag-and-drop form design facilities. It also came with a small language for command execution. Each command and widget (or gizmos, as Cooper called them) was implemented as a dll, thus making Tripod user-extensible. These ideas revolutionized the programming tools landscape. Remember the RAD fad of the ‘nineties? Microsoft renamed Tripod as “Ruby” (!!) and later, replaced the command language with Basic, though Alan Cooper disliked it. Ruby became Visual Basic. You can read the whole story here.

BTW, if you are interested in product/UI design, you may want to look at the works of his company.

LSP

Liskov Substitution Principle (LSP) is a principle in Object-Oriented Programming which states that

derived types must be completely substitutable for their base types.

Functions that receive a reference to a base class object should also be able to take a reference to a derived class object without being aware of it. This is required because a reference to a base type object may actually point to a derived type object at run-time and may cause code written specifically for the base type to misbehave or produce wrong results. Adding a derived type to the system should not require changes to existing code. This principle is often violated by naive class hierarchies based on the is-a relationship. The canonical example is a square class derived from a rectangle. A square is a rectangle which imposes the additional constraint that both its width and height must be equal. So it might seem natural to derive square from rectangle and impose the constraint by overriding the modifiers. Let us first implement the rectangle class. Then we will derive square from it and see how it violates the Substitution Principle. After that we will device a better design by introducing contracts that control how methods might be overridden by a derived class. I choose CLOS as the implementation language because it provides a simple mechanism for adding method contracts.

Here is the definition of the rectangle class:

(defclass rectangle ()
  ((width :accessor width :initform 0)
   (height :accessor height :initform 0)))

(defgeneric set-width (rect w))
(defgeneric set-height (rect h))

(defmethod set-width ((rect rectangle) w)
  (setf (width rect) w))

(defmethod set-height ((rect rectangle) h)
  (setf (height rect) h))

There is a method to impose a system-wide policy that the area of a rectangle is always 200:

(defmethod check-area ((rect rectangle))
  (assert (= (* (width rect) (height rect)) 200))
  t)

> (defvar r (make-instance 'rectangle))
> (set-width r 100)
> (set-height r 2)
> (check-area r) ;; => T
> (set-height r 200)
> (check-area r) ;; => assertion fails.

Later it was required that a square type be added to the system. Its definition is based on the assumption that a square is-a rectangle. As we proceed with the implementation, it is observed that a square requires only a single field to represent both its width and height. The problem is solved by overriding the modifier methods and setting both fields simultaneously to the same value. (A good programmer will take this duplication of data as the first indication that something is broken in the design. He will not try to work around it!)

(defclass square (rectangle) ())

(defmethod set-width ((sqr square) w)
  (setf (height sqr) w)
  (setf (width sqr) w))

(defmethod set-height ((sqr square) h)
  (setf (width sqr) h)
  (setf (height sqr) h))

The above class definition has violated the Substitution Principle. This is proved by the following test. check-area will fail, unless it is modified to take special care of square objects:

;; somewhere up, `r` was made to point to a `square` object.
(set-width r 100)
(set-height r 2)
(check-area r) ;; => assertion fails!

How can we prevent the creation of derived types that violate LSP? Programming by Contract, where preconditions and post-conditions are established for class methods, will help in solving this problem. The precondition must be true for the method to execute. Upon completion, the method guarantees that the post-condition will be true. We will redefine rectangle with pre/post conditions that makes sure that changing one field will not affect the other. The :before and :after method qualifiers in CLOS allows us to add these conditions:

(defclass rectangle ()
  ((width :accessor width :initform 0)
   (height :accessor height :initform 0)
   (old-width :accessor old-width)
   (old-height :accessor old-height)))

;; set-width precondition.
;; Makes sure that w is > 0 and saves the value of height.
(defmethod set-width :before ((rect rectangle) w)
  (assert (> w 0))
  (setf (old-height rect) (height rect)))

(defmethod set-width ((rect rectangle) w)
  (setf (width rect) w))

;; set-width post-condition.
;; Makes sure that the method that changed the value of width,
;; did not change the value of height also.
(defmethod set-width :after ((rect rectangle) w)
  (assert (= (height rect) (old-height rect))))

;; pre/post conditions similar to set-width are also
;; applied to set-height.

(defmethod set-height :before ((rect rectangle) h)
  (assert (> h 0))
  (setf (old-width rect) (width rect)))

(defmethod set-height ((rect rectangle) h)
  (setf (height rect) h))

(defmethod set-height :after ((rect rectangle) w)
  (assert (= (width rect) (old-width rect))))

Once these conditions are established, the overridden modifiers of square refuse to work, as the post-condition assertion fails:

> (set-width r 100) ;; assertion failed.

The implementor of square has to unlink it from rectangle and rethink his design. By a rule of Programming by Contract, a derivative may only replace the precondition of a method by a weaker one, and its post-condition by a stronger one. (As qualified method calls are propagated up the class hierarchy in CLOS, it is difficult to break this rule here/).

An even better design that will allow both rectangle and square to stay within the same hierarchy is to define a common base class for all shapes and derive rectangle and square from it:

(defclass shape () ())

(defgeneric set-width (shape w))
(defgeneric set-height (shape h))

(defclass rectangle (shape)
  ((width :accessor width :initform 0)
   (height :accessor height :initform 0)
   (old-width :accessor old-width)
   (old-height :accessor old-height)))

;; set-width precondition.
;; Makes sure that w is > 0 and saves the value of height.
(defmethod set-width :before ((rect rectangle) w)
  (assert (> w 0))
  (setf (old-height rect) (height rect)))

(defmethod set-width ((rect rectangle) w)
  (setf (width rect) w))

;; set-width post-condition.
;; Makes sure that the method that changed the value of width,
;; did not change the value of height also.
(defmethod set-width :after ((rect rectangle) w)
  (assert (= (height rect) (old-height rect))))

;; pre/post conditions similar to set-width are also
;; applied to set-height.

(defmethod set-height :before ((rect rectangle) h)
  (assert (> h 0))
  (setf (old-width rect) (width rect)))

(defmethod set-height ((rect rectangle) h)
  (setf (height rect) h))

(defmethod set-height :after ((rect rectangle) w)
  (assert (= (width rect) (old-width rect))))

(defmethod check-area ((rect rectangle))
  (assert (= (* (width rect) (height rect)) 200))
  t)

;; type square
(defclass square (shape)
   ((side :accessor side :initform 0)))

(defmethod set-width ((sqr square) w)
  (setf (side sqr) w))

(defmethod set-height ((sqr square) h)
  (setf (side sqr) h))

Thread.interrupt()

The only sensible way to cancel an executing Java thread is by interrupting it. This will set a boolean interrupted status flag in the thread object to true. Calling interrupt on a thread does not mean that it will immediately stop. It just requests the thread to interrupt itself at the next blocking call. These blocking calls comprise methods such as sleep, wait and join. They are known as cancellation points. If the interrupted status is set, these methods clear that flag and throw an InterruptedException. What should a caller do if it catches an InterruptedException? The answer depends on the ownership of the thread. The object that launched the thread is considered its owner. For instance, a thread pool is the owner of all worker threads that it started. Only the owner knows what to do when a thread is interrupted. Code executed by the owner when a thread is interrupted is known as its interruption policy or cancellation policy. The owner will be denied the chance to execute this policy if the InterruptedException is not propagated up to it. To make this idea clear, let us look at the implementation of a simple TaskExecutor. This class receives a Task and executes it in a thread. TaskExecutor also exposes a method to cancel a running task. A task is represented by this simple interface:

public interface Task {
    public void execute () throws InterruptedException;
}

execute throws the InterruptedException so that implementers get a chance to propagate it to the TaskExecutor.

The TaskExecutor receives a task through its submit method. This task is passed on to a TaskThread where it is executed. The TaskExecutor object owns this thread. When it is interrupted, the TaskThread calls the appropriate method that executes the interruption policy as defined by TaskExecutor. An interruption policy may free up some thread specific resource, reset some state or do something else that only the owner knows of. (As we have only one thread in the TaskExecutor, the thread also calls a global cleanup method before it finish running. In the case of a thread pool, this method might be called before the last thread is interrupted or when it exists normally). Here is the code that defines the TaskThread class:

public class TaskThread extends Thread {

     public TaskThread (Task task, TaskExecutor owner) {
         this.task = task;
         this.owner = owner;
     }

     public void run () {
         try {
             task.execute ();
         } catch (InterruptedException ex) {
             if (owner != null)
                 owner.executeInterruptionPolicy ();
         } finally {
             if (owner != null) {
                 owner.cleanup ();
             }
         }
     }

     private Task task;
     private TaskExecutor owner;
 }

Now, the definition of TaskExecutor itself:

public class TaskExecutor {

    public void submit (Task task) {
        t = new TaskThread (task, this);
        t.start ();
        running = true;
    }

    public void executeInterruptionPolicy () {
        System.out.println ("executing interruption policy ...");
    }

    public void cleanup () {
        if (running) {
            System.out.println ("cleaning up ...");
            running = false;
        }
    }

    public void cancel () {
        if (running)
            t.interrupt ();
    }

    public boolean isRunning () {
        return running;
    }

    private TaskThread t;
    private boolean running;
}

The cancel method just calls interrupt on the running thread, so that, if the task makes a blocking call on the thread, it will throw an InterruptedException. As the Task implementation do not own the thread and cannot be sure of its interruption policy, it should not consume this exception. Here is a Task implementation that correctly hands over the InterruptedException to someone who knows what it means to interrupt this particular thread:

public class CounterTask implements Task {

    public void execute () throws InterruptedException {
        int i = 0;
        while (i < 10) {
            System.out.print (++i + " ");
            try {
                Thread.sleep (500);
            } catch (InterruptedException ex) {
                throw ex;
            }
        }
    }
}

The following code submits a CounterTask to the executor. User can cancel the task by typing the ‘q’ key:

public class Test {
    public static void main (String args[]) {
        TaskExecutor t = new TaskExecutor ();
        t.submit (new CounterTask ());
        while (t.isRunning ()) {
        try {
            int c = System.in.read ();
            if ((char)c == 'q') {
                t.cancel ();
                break;
            }
        } catch (java.io.IOException ex) { }
    }
}

A sample run:

$ java.exe Test
1 2 3 4 5 6 q
executing interruption policy ...
cleaning up ...

If you comment out throw ex; in CounterTask.execute(), TaskExecutor.executeInterruptionPolicy() will not run.

To summarize :– The proper way to cancel a running thread is to call interrupt on it. Its interrupted status can either be polled (by calling isInterrupted) or by catching the InterruptedException. Consume this exception only if you own the thread. Otherwise re-throw the exception and give the owner of the thread a chance to execute the proper interruption policy.