Coding With Fun
Home Docker Django Node.js Articles Python pip guide FAQ Policy

How Java implements co-programs


May 10, 2021 Java



The word Coroutine actually has a lot to say, for example, some people like to call it Fiber, or Green Thread. I n fact, the most intuitive explanation for the co-program is the thread thread. It sounds a bit tongue-in-cheek, but it's essentially the case.

The core of the co-program is that he is responsible for scheduling that blocking operation, immediately give up, and record the data on the current stack, immediately after blocking to find a thread to recover the stack and put the results of blocking on this thread to run, so that it seems to be no different from writing synchronization code, the whole process can be called coroutine, and run by coroutine responsible for scheduling thread called Fiber.


The implementation of the java co-program

In the early days, implementing co-programs on JVM generally used kilim, but this tool has not been updated for a long time, and now the most common tool is Quasar, and this article will be based entirely on Quasar.

Here's an attempt to implement go-language-like coroutine and channel through Quasar.

In order to have a clear contrast, here is the first go language to implement an example of the natural number within 10 squared separately.

func counter(out chan<- int) {
  for x := 0; x < 10; x++ {
    out <- x
  }
  close(out)
}

func squarer(out chan<- int, in <-chan int) {
  for v := range in {
    out <- v * v
  }
  close(out)
}

func printer(in <-chan int) {
  for v := range in {
    fmt.Println(v)
  }
}

func main() {
  //定义两个int类型的channel
  naturals := make(chan int)
  squares := make(chan int)

  //产生两个Fiber,用go关键字
  go counter(naturals)
  go squarer(squares, naturals)
  //获取计算结果
  printer(squares)
}

In the example above, data sharing on both sides is decoupled by channel. F or this channel, you can understand it as ThesserQueue in Java. B elow I go directly to the Quasar version of java code, almost intact copy of the go language code.

public class Example {

  private static void printer(Channel<Integer> in) throws SuspendExecution,  InterruptedException {
    Integer v;
    while ((v = in.receive()) != null) {
      System.out.println(v);
    }
  }

  public static void main(String[] args) throws ExecutionException, InterruptedException, SuspendExecution {
    //定义两个Channel
    Channel<Integer> naturals = Channels.newChannel(-1);
    Channel<Integer> squares = Channels.newChannel(-1);

    //运行两个Fiber实现.
    new Fiber(() -> {
      for (int i = 0; i < 10; i++)
        naturals.send(i);
      naturals.close();
    }).start();

    new Fiber(() -> {
      Integer v;
      while ((v = naturals.receive()) != null)
        squares.send(v * v);
      squares.close();
    }).start();

    printer(squares);
  }
}

By contrast, Java looks more complicated, and there's no way this is Java's style, and it's still achieved through a third-party library.

At this point, you must be curious about Fiber. P erhaps you'll wonder if Fiber is, as described above, let's try building a million Fibers with Quasar to see how much memory is consumed, and I'll try to create a million Threads first.

for (int i = 0; i < 1_000_000; i++) {
  new Thread(() -> {
    try {
      Thread.sleep(10000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }).start();
}

Unfortunately, it is reasonable to report directly exception in thread "main" java.lang.OutOfMemoryError: unable to create new native thread. H ere's how to build a million Fibers with Quasar.

public static void main(String[] args) throws ExecutionException, InterruptedException, SuspendExecution {
  int FiberNumber = 1_000_000;
  CountDownLatch latch = new CountDownLatch(1);
  AtomicInteger counter = new AtomicInteger(0);

  for (int i = 0; i < FiberNumber; i++) {
    new Fiber(() -> {
      counter.incrementAndGet();
      if (counter.get() == FiberNumber) {
        System.out.println("done");
      }
      Strand.sleep(1000000);
    }).start();
  }
  latch.await();
}

I've added catch here, the blocker closes when it's finished running, strand.sleep is actually the same as Thread.sleep, but it's for Fiber.

The final console outputs done, indicating that the program has created a million Fibers, and Sleep is set up to keep Fiber running, making it easy to calculate memory footprint. O fficials say an idle Fiber takes up about 400 Byte, which is supposed to take up 400MB of heap memory, but here it takes about 1000MB through jmap-heap pid, which means a Fiber takes up 1KB.


How Quasar implemented Fiber

In fact, Quasar implements coroutine in a much similar way to the Go language, except that the former is implemented using a framework, while the go language is a built-in feature of the language.

But if you're familiar with the Go-language scheduling mechanism, you'll understand a lot about Quasar's scheduling mechanism because there are so many similarities between the two.

The Fiber in Quasar is actually a continuation, which can be dispatched by Quasar-defined scheduler, a continuation that records the status of the running instance and is interrupted at any time, and then recovers where he is interrupted.

Quasar actually does this by modifying bytecode, so when you run the Quasar program, you need to modify your code at runtime through java-agent, and of course you can do so during compilation. T he go language has its own scheduler built in, while Quasar defaults to ForkJoinPool, a working-stealing thread pool. w ork-stealing is important because you don't know which Fiber will finish first, and work-stealing can dynamically steal a context from other wait queues to maximize CPU usage.

So here you'll ask, how does Quasar know which bytecodes to modify, but it's also very simple, Quasar scans at runtime through java-agent which methods can be interrupted, and inserts some continuation logic into the method before and after the method is called, and if you define @Suspendable annotations on the method, Quasar will do something like this with the method that calls the annotation.

Assuming you define @Suspendable on method f and call method g with the same annotation, all methods that call f insert some bytecode, the logic of which is to record the state on the current Fiber stack so that it can be recovered dynamically in the future. ( Fiber-like threads have their own stacks. W ithin the system chain, Fiber's parent class calls Fiber.park, which throws the SuspendExecution exception to stop the thread from running so that Quasar's scheduler can perform the scheduling. T he SuspendExecution here is captured by Fiber himself and should not be captured on a business level. I f Fiber is awakened (the scheduler level calls Fiber.unpark), then f will be called again where it is interrupted (where Fiber will know where he is interrupted) and the g call result (greturn result) will be inserted into the f recovery point, so that it looks as if g's return is f's lo variablecals, thus avoiding the callback nesting.

It says a lot, but in simple terms, find a way to stop the running thread stack and get Quasar's scheduler to step in.

There are two conditions for JVM thread breaks:

1, throw exception

2、return。

In Quasar, this is usually achieved by throwing exceptions, so you'll see that the code above throws TheSuspendExecution. But if you do catch this exception, there's a problem, so it's usually written like this.

@Suspendable
public int f() {
  try {
    // do some stuff
    return g() * 2;
  } catch(SuspendExecution s) {
    //这里不应该捕获到异常.
    throw new AssertionError(s);
  }
}


Coroutine in Java - Quasar Fiber implementation

Quasar Fiber is by bytecode modification technology to weave the necessary context saving/recovery code at compile or load time, pause by throwing exceptions, restore the jvm method call stack and local variables according to the saved context (Continuation), Quasar Fiber provides the corresponding Java class library to achieve, the application has a certain intrusive (very small)

Quasar Fiber consists mainly of instrument and Continuation and Scheduler

  • Instrument does some code implantation, such as saving/recovering the context before and after park
  • Continuation saves the information called by the method, such as local variables, references, and so on, the user-state stack, which is also the biggest difference from asynchronous frameworks such as akka based on fixed callback interfaces
  • The Scheduler scheduler, which is responsible for assigning fiber to specific os thread execution



Related reading:

JAVA Microseedding - Learn Java like a game

JAVA multithreaded programming