// // GarbledOutput.java D. Parson January 2021 introductory example code // showing the basic Java multithreading approach for CSC543 class 1. import java.util.concurrent.CountDownLatch ; // CountDownLatch acts like the gate in a horse race. // All the threads must countDown() and await() the latch before // they are all released concurrently to run. The following two imports are // for writing to an output file. // UPDATED 1/27/2021 TO DEMO UNSAFE LinkedList vs THREAD-SAFE LISTS import java.util.* ; // For List interface and LinkedList class import java.util.concurrent.* ; // For thread-safe List subclasses. import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicBoolean; import java.io.File ; import java.io.PrintWriter ; import net.jcip.annotations.* ; import java.util.Collections ; // Main class: public class GarbledOutput { @Immutable static class OrderedPair { public final long ThreadID, Counter ; OrderedPair(long ThreadID, long Counter) { this.ThreadID = ThreadID ; this.Counter = Counter ; } } // Helper class Threader is an active class, where each such object is // intended to run a thread of execution (a.k.a. instruction stream) // in its run() method. java.lang.Runnable is just an interface declaring // that any subclass must supply the public run() method. public static class Threader implements java.lang.Runnable { // Use private whenever possible to hide data. // Use final whenever possible to guarantee no mutation. // Use volatile *only as necessary* to ensure mutable built-in // data types get initialized & updated so other threads can see // the updates. The thread that runs run() is different than the main // thread that calls the Threader constructor. However in this case, // start() never starts run()'s thread until after this object is // constructed, and start() implements a memory barrier in the Java // Memory Model (we will discuss), so counter need not be volatile. // Volatile variables get flushed to main memory on every update, // and read into registers repeatedly, incurring overhead. // Volatile is not needed here because start() comes between // construction and run(). // BETTER TO PUT IT IN run() AS LOCAL VAR private int counter = 0 ; private final int bufferBound = 5 ; // see spinlock below. private final CountDownLatch mylatch ; @GuardedBy("writer") private final PrintWriter writer ; @GuardedBy("writer") private final List mylist ; // shovel counters into here private final AtomicInteger deathcounter ; private final AtomicBoolean spinlock ; // see synchronized below private final Thread mainthread ; private final int numthreads ; public Threader(PrintWriter writer, CountDownLatch mylatch, List mylist, AtomicInteger deathcounter, AtomicBoolean spinlock, Thread mainthread, int numthreads) { this.writer = writer ; this.mylatch = mylatch ; this.mylist = mylist ; this.deathcounter = deathcounter ; this.spinlock = spinlock ; this.mainthread = mainthread ; this.numthreads = numthreads ; } public void run() { int counter = 0 ; // System.err.println("START THREAD " // + Thread.currentThread().getId()); // System.err.flush(); // I leave debugging prints commented until I no longer need them. try { // The CountDownLatch is a demo kludge just to force the run() // threads to indeed run concurrently. mylatch.countDown(); // Decrement count at the "starting gate" mylatch.await(); // Wait for all the threads before proceeding. } catch (InterruptedException xxx) { // Blocking calls like await() may throw InterruptedException // if they receive an interrupt signal from another thread in // the same process, usually as a signal to initiate worker // thread termination. This demo is not using interrupts, // but the compiler still forces a catch due to await(). } // Without some form of synchronization, the threads will // intermix their output. synchronized(writer) uses the lock // built into the writer object -- every Java object has a // built-in lock -- to serialize execution of the following // block of code, where "serialize" means one-thread-at-a-time // execution. synchronized methods and blocks are Java's // implementation of the "monitor" concept. All threads must // synchronize on the same object-lock. // // Update 2/24/2021 -- The synchronized section used to limit // one thread at a time to writing, but since adding the intrinsic // condition variable's wait() for the consumer (main thread) to // make space, other writers may get in during the wait. So I // am adding an atomic spin lock on 2/26/2021. while (! spinlock.compareAndSet(false, true)) { // spin until requesting thread gets the spin lock } try { synchronized(writer) { while (++counter < 20) { // mylist.add(Thread.currentThread().getId()); // mylist.add((long)counter); // CONDITION VARIABLE (IMPLICIT) while (mylist.size() >= bufferBound) { try { // wait on condition variable until main() drains // mylist. writer.wait(); } catch (InterruptedException xxx) { } } writer.println("" + counter + " " + Thread.currentThread().getId()); mylist.add(new OrderedPair(Thread.currentThread().getId(), (long)counter)); writer.notifyAll(); } } } finally { spinlock.set(false); // release spinlock } try { Thread.sleep(10); // try to delay so main thread gets ahead. } catch (InterruptedException wonthappen) { } if (deathcounter.incrementAndGet() == numthreads) { mainthread.interrupt(); } } } public static void main (String [] args) { // Make a practice of hiding any variable you can as a // local variable or function parameter. That way it is accessible // only within the thread running that function. Each thread // gets its own set of local variables & parameters. // Note, though, that local variables may point to shared objects, // which is the case here. We will discuss how "final" variables, // similar to C++ "const", enhance thread safety by disallowing // mutation of such variables by sharing threads. final int numthreads = 10 ; PrintWriter Gwriter = null ; CountDownLatch latch = null ; AtomicInteger deathcounter = new AtomicInteger(0); AtomicBoolean spinlock = new AtomicBoolean(false); Thread mainthread = Thread.currentThread(); try { // Gwriter = new PrintWriter("junk.txt"); // demo output Gwriter = new PrintWriter("junk.txt"); // demo output latch = new CountDownLatch(numthreads); Thread [] runners = new Thread [ numthreads ]; List intlist = new LinkedList(); // List intlist = Collections.unmodifiableList( // new LinkedList()); // List intlist = Collections.synchronizedList( // new LinkedList()); // List intlist = new CopyOnWriteArrayList(); // List intlist = new Vector(); // First loop builds the active Threader objects. for (int i = 0 ; i < runners.length ; i++) { // Threader is an active object whose run() method // runs its thread. The java.lang.Thread object wrapped // around it initiates and manages the thread. runners[i] = new Thread(new Threader(Gwriter, latch, intlist, deathcounter, spinlock, mainthread, numthreads)); } // Second loop starts threads running in their run() methods. // start() implements a memory barrier -- To Be Defined. for (int i = 0 ; i < runners.length ; i++) { runners[i].start(); } synchronized(Gwriter) { while (deathcounter.get() < numthreads || intlist.size() > 0) { OrderedPair op = null ; while (intlist.size() < 1 && deathcounter.get() < numthreads) { try { // intlist is a bounded buffer, fluctuates from // 0 to bufferBound. Gwriter.wait(); // Emre's solution is to make above a // timed wait, so if final writer dies // after reader enters the wait(), // reader won't wait forever. // See GarbledOutputTimedWait on course page // // Chris suggests sending interrupt() // from writer to reader just when // writer is about to terminate, in case // reader is sitting in wait(). // This version uses the interrupt. // See GarbledOutputInterrupted on course page. } catch (InterruptedException xxx) { System.err.println("Gwriter.wait() interrupted"); } } op = (intlist.size() > 0) ? intlist.remove(0) : null; Gwriter.notifyAll(); if (op != null) { System.err.println("MAIN SEES " + op.ThreadID + "," + op.Counter); } } } // In the third loop the main thread -- this thread -- waits // for them to terminate when their run() activations return. // join() implements a memory barrier -- To Be Defined. for (int i = 0 ; i < runners.length ; i++) { try { runners[i].join(); } catch (InterruptedException iex) { // Impossible if not sent from another thread. } } } catch (java.io.FileNotFoundException fxx) { // new PrintWriter declares it may throw this exception. System.err.println("ERROR, FILE NOT FOUND " + fxx); } finally { // finally runs regardless of whether exceptions thrown or not. if (Gwriter != null) { // If file opened OK. Gwriter.close(); // Flush output buffer to output file. } } } }