Coding With Fun
Home Docker Django Node.js Articles Python pip guide FAQ Policy

Scala is programmed in a synth


May 14, 2021 Scala


Table of contents


Scala is programmed in a synth

Runnable/Callable

The Runnable interface has only one method that does not return a value.

trait Runnable {
  def run(): Unit
}
Callable与之类似,除了它有一个返回值

trait Callable[V] {
  def call(): V
}

Thread

Scala complexity is based on the Java complex model.

On Sun JVM, for IO-intensive tasks, we can run thousands of threads on a single machine.

A thread needs a runnable. You must call the thread's start method to run runnable.

scala> val hello = new Thread(new Runnable {
  def run() {
    println("hello world")
  }
})
hello: java.lang.Thread = Thread[Thread-3,5,main]

scala> hello.start
hello world

When you see a class implementing the Runnable interface, you know that its purpose is to run in a thread.

Single-threaded code

Here's a piece of code that works but has a problem.

import java.net.{Socket, ServerSocket}
import java.util.concurrent.{Executors, ExecutorService}
import java.util.Date

class NetworkService(port: Int, poolSize: Int) extends Runnable {
  val serverSocket = new ServerSocket(port)

  def run() {
    while (true) {
      // This will block until a connection comes in.
      val socket = serverSocket.accept()
      (new Handler(socket)).run()
    }
  }
}

class Handler(socket: Socket) extends Runnable {
  def message = (Thread.currentThread.getName() + "\n").getBytes

  def run() {
    socket.getOutputStream.write(message)
    socket.getOutputStream.close()
  }
}

(new NetworkService(2020, 2)).run

Each request responds to the name of the current thread, so the result is always main.

The main disadvantage of this code is that at the same time, only one request can be corresponding!

You can put each request into one thread. Just make a simple change

(new Handler(socket)).run()

For

(new Thread(new Handler(socket))).start()

But what if you want to reuse threads or have other policies for thread behavior?

Executors

With the release of Java 5, it decided to provide a more abstract interface for threads.

You can get an ExecutorService object through the static method of the Executors object. These methods provide you with ExecutorService that can be configured with various policies, such as thread pools.

Let's override our previous blocking network server to allow for 2000 requests.

import java.net.{Socket, ServerSocket}
import java.util.concurrent.{Executors, ExecutorService}
import java.util.Date

class NetworkService(port: Int, poolSize: Int) extends Runnable {
  val serverSocket = new ServerSocket(port)
  val pool: ExecutorService = Executors.newFixedThreadPool(poolSize)

  def run() {
    try {
      while (true) {
        // This will block until a connection comes in.
        val socket = serverSocket.accept()
        pool.execute(new Handler(socket))
      }
    } finally {
      pool.shutdown()
    }
  }
}

class Handler(socket: Socket) extends Runnable {
  def message = (Thread.currentThread.getName() + "\n").getBytes

  def run() {
    socket.getOutputStream.write(message)
    socket.getOutputStream.close()
  }
}

(new NetworkService(2020, 2)).run

Here's a connection script that shows how internal threads are reused.

$ nc localhost 2020
pool-1-thread-1

$ nc localhost 2020
pool-1-thread-2

$ nc localhost 2020
pool-1-thread-1

$ nc localhost 2020
pool-1-thread-2

Futures

Future represents asynchronous calculations. Y ou can wrap your calculations in Future, and when you need to calculate the results, you just need to call a blocked get() method. O ne Executor returns one Future. If you use a Finagle RPC system, you can use a Future instance to hold results that may not have been reached yet.

A FutureTask is a Runnable implementation that is designed to run as Executor

val future = new FutureTask[String](new Callable[String]() {
  def call(): String = {
    searcher.search(target);
}})
executor.execute(future)

Now I need the result, so block until it's done.

val blockingResult = future.get()

Future is heavily used in the Finagle introduction to Scala School, including some good ways to combine them. and Effective Scala's comments on "Futures" (http://twitter.github.com/effectivescala/#Twitter's standard libraries-Futures).

Thread security issues

class Person(var name: String) {
  def set(changedName: String) {
    name = changedName
  }
}

This program is not safe in a multithreaded environment. If two threads have a reference to the same Person instance and call set, you cannot predict the result of name after two calls end.

In the Java memory model, each processor is allowed to cache values in the L1 or L2 cache, so both threads running on different processors can have their own view of the data.

Let's discuss some tools to keep threads consistent with the view of the data.

Three tools

Synchronous

Mutually exclusive locks (Mutex) provide ownership semantics. W hen you go into a mutually exclusive body, you have it. S ynchronization is the most common way to use mutually exclusive locks in JVM. In this example, we synchronize People.

In JVM, you can synchronize any instance that is not null.

class Person(var name: String) {
  def set(changedName: String) {
    this.synchronized {
      name = changedName
    }
  }
}

volatile

As the Java 5 memory model changes, volatile and synchronized are essentially the same, except that volatile allows empty values.

Synchronized allows finer-grained locks. Volatile, on the other hand, synchronizes each access.

class Person(@volatile var name: String) {
  def set(changedName: String) {
    name = changedName
  }
}

AtomicReference

In addition, a series of low-level co-origins have been added to Java 5. The AtomicReference class is one of them

import java.util.concurrent.atomic.AtomicReference

class Person(val name: AtomicReference[String]) {
  def set(changedName: String) {
    name.set(changedName)
  }
}

What's the cost?

AtomicReference is the most expensive of the two options because you have to access the value through method dispatch.

Volatile and synchronized are built on Java's built-in monitors. I f there is no resource contention, the cost of the monitor is small. Synchronized is often the best choice because it allows you to take more granular control, resulting in less competition.

When you go to a sync point, access a volatile reference, or remove an AtomicReferences reference, Java forces the processor to refresh its cache line, providing a consistent view of the data.

If I'm wrong, please point it out. This is a complex subject, and I'm sure it will take a long class discussion to figure it out.

Java 5's other dexterous tools

As mentioned earlier in AtomicReference, Java 5 brings a lot of great tools.

CountDownLatch

CountDownLatch is a simple mechanism for multithreaded communication with each other.

val doneSignal = new CountDownLatch(2)
doAsyncWork(1)
doAsyncWork(2)

doneSignal.await()
println("both workers finished!")

Let's not say anything else, it's a good unit test. F or example, you're doing some asynchronous work and making sure the functionality is complete. All your function needs is countdown and wait for (await) in the test.

AtomicInteger/Long

Because incrementation of Int and Long is a frequently used task, AtomicInteger and AtomicLong have been added.

AtomicBoolean

I probably don't need to explain what this is.

ReadWriteLocks

ReadWriteLock gives you lock control over read and write threads. The reader can only wait when the write thread acquires the lock.

Let's build an insecure search engine

Below is a simple inverted index, which is not thread-safe. Our inverted indexes are mapped by name to a given user.

The code here naively assumes that only a single thread is accessible.

Note the use of mutable. HashMap replaces the default constructor this()

import scala.collection.mutable

case class User(name: String, id: Int)

class InvertedIndex(val userMap: mutable.Map[String, User]) {

  def this() = this(new mutable.HashMap[String, User])

  def tokenizeName(name: String): Seq[String] = {
    name.split(" ").map(_.toLowerCase)
  }

  def add(term: String, user: User) {
    userMap += term -> user
  }

  def add(user: User) {
    tokenizeName(user.name).foreach { term =>
      add(term, user)
    }
  }
}

There is no write here on how to get users from the index. We'll add later.

Let's make it thread-safe

In the inverted index example above, userMap is not guaranteed to be thread-safe. Multiple clients can try to add items at the same time, and there is a possibility of view errors in the previous Person example.

Since userMap is not thread-safe, how can we keep only one thread changing it at the same time?

You might consider locking userMap when you do the addition.

def add(user: User) {
  userMap.synchronized {
    tokenizeName(user.name).foreach { term =>
      add(term, user)
    }
  }
}

Unfortunately, this grain is too coarse. B e sure to try to do as much time-consuming work as possible outside the mutually exclusive lock. R emember when I said lock overhead would be small if there wasn't a resource race? The less work you do in the lock block, the less contention there will be.

def add(user: User) {
  // tokenizeName was measured to be the most expensive operation.
  val tokens = tokenizeName(user.name)

  tokens.foreach { term =>
    userMap.synchronized {
      add(term, user)
    }
  }
}

SynchronizedMap

We can mix synchronization into a variable HashMap through the SynchronizedMap trait.

We can extend the existing InvertedIndex to provide users with an easy way to build a synchronous index.

import scala.collection.mutable.SynchronizedMap

class SynchronizedInvertedIndex(userMap: mutable.Map[String, User]) extends InvertedIndex(userMap) {
  def this() = this(new mutable.HashMap[String, User] with SynchronizedMap[String, User])
}

If you look at its implementation, you realize that it just adds a sync lock to each method to keep it secure, so it probably doesn't have the performance you want.

Java ConcurrentHashMap

Java has a good thread-safe ConcurrentHashMap. Thankfully, we can get good Scala semantics from JavaConverters.

In fact, we can seamlessly access the new thread-safe InvertedIndex by extending the old insecure code.

import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._

class ConcurrentInvertedIndex(userMap: collection.mutable.ConcurrentMap[String, User])
    extends InvertedIndex(userMap) {

  def this() = this(new ConcurrentHashMap[String, User] asScala)
}

Let's load InvertedIndex

The original way

trait UserMaker {
  def makeUser(line: String) = line.split(",") match {
    case Array(name, userid) => User(name, userid.trim().toInt)
  }
}

class FileRecordProducer(path: String) extends UserMaker {
  def run() {
    Source.fromFile(path, "utf-8").getLines.foreach { line =>
      index.add(makeUser(line))
    }
  }
}

For each line in the file, we can call makeUser and then add to InvertedIndex. If we use parallel InvertedIndex, we can call add in parallel because makeUser has no side effects, so our code is already thread-safe.

We can't read the file in parallel, but we can construct the user in parallel and add it to the index.

One solution: producer/consumer

A common pattern of asynchronous computing is to separate consumers from producers so that they can only communicate through queues. Let's see how this pattern is applied to our search engine index.

import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue}

// Concrete producer
class Producer[T](path: String, queue: BlockingQueue[T]) extends Runnable {
  def run() {
    Source.fromFile(path, "utf-8").getLines.foreach { line =>
      queue.put(line)
    }
  }
}

// Abstract consumer
abstract class Consumer[T](queue: BlockingQueue[T]) extends Runnable {
  def run() {
    while (true) {
      val item = queue.take()
      consume(item)
    }
  }

  def consume(x: T)
}

val queue = new LinkedBlockingQueue[String]()

// One thread for the producer
val producer = new Producer[String]("users.txt", q)
new Thread(producer).start()

trait UserMaker {
  def makeUser(line: String) = line.split(",") match {
    case Array(name, userid) => User(name, userid.trim().toInt)
  }
}

class IndexerConsumer(index: InvertedIndex, queue: BlockingQueue[String]) extends Consumer[String](queue) with UserMaker {
  def consume(t: String) = index.add(makeUser(t))
}

// Let's pretend we have 8 cores on this machine.
val cores = 8
val pool = Executors.newFixedThreadPool(cores)

// Submit one consumer per core.
for (i <- i to cores) {
  pool.submit(new IndexerConsumer[String](index, q))
}