Class DatagramReader

  • All Implemented Interfaces:
    java.lang.Cloneable, Actor, Executable, FiringsRecordable, Initializable, TypedActor, Changeable, Debuggable, DebugListener, Derivable, Instantiable, ModelErrorHandler, MoMLExportable, Moveable, Nameable

    public class DatagramReader
    extends TypedAtomicActor
    This actor reads datagram packets via a separate thread. The thread responds to datagrams whenever they arrive, giving the actor the ability to read the datagrams asynchronously. Datagrams are connectionless, open-loop internet communications. Each datagram packet contains data plus a return address. The return address consists of an IP address and a socket number. Datagrams use the UDP protocol under which no reply or confirmation is expected. This is in contrast to TCP which expects confirmations and attempts to deliver packets in order to the layer above TCP. This can result in long delays in the delivery of information across the network. Because UDP makes no such attempts, it never hangs and does not need to be timed out.

    NOTE: This actor has been developed to work in the Discrete Event (DE) and Synchronous Data Flow (SDF) domains. Use elsewhere with caution.

    NOTE: This actor has problems, the tests do not reliably pass. For details, see https://chess.eecs.berkeley.edu/bugzilla/show_bug.cgi?id=54.

    The simplest scenario has the thread constantly stalled awaiting a packet. When a packet arrives, the thread quickly queues it in one of the buffers of the actor, calls the getDirector().fireAtCurrentTime(), and then stalls again awaiting the next packet. By stalling again and again, the thread keeps the actor aware at all times of incoming packets. This is particularly important if packets come in more quickly than the model can process them. Depending on the domain (e.g. DE) in which this actor is used, the director may respond to the fireAtCurrentTime() call of the thread by calling the fire() method of the actor. In this case, fire() then broadcasts the data received, along with the return address and return socket number from which the datagram originated.

    The data portion of the packet is broadcast at the output port. The type of the output is always an array of bytes.

    The return address and socket number are broadcast as String and int respectively. These tell where the received datagram originated from.

    The behavior of the actor under less simple scenarios is governed by parameters of this actor. Additional packet(s) can arrive while the director is getting around to calling fire(). Conversely, the director may make extra calls to fire(), even before any datagrams have come in. I call these the eager packet and eager director scenarios respectively.

    Background: There are two packet buffers. The thread and the fire() method share these buffers and maintain consistency via synchronization on the object _syncFireAndThread. This synchronization prevents conflicts when accessing the shared buffers and when accessing the count of queued packets.

    The overwrite parameter applies to the eager packet scenario. Setting this parameter to true is useful in cases where it is possible for data to come in too fast for the model to process. This setting alleviates data gluts without undue loss of data when the model is able to keep up. When overwrite is set to true (the default), the actor discards the packet already received in favor of the new packet. If false, the new packet is queued behind the existing one. In the latter case, both buffers are now full. The thread then waits for fire() to consume a queued packet before it stalls again awaiting the next. In all other cases (overwrite true or no queued packets) the thread immediately stalls to await the next packet.

    The blockAwaitingDatagram parameter applies to the eager director case. This case comes up most often in SDF, where an actor is expected to block in fire until an output can be produced. If true, a call to fire() will block unless or until a datagram has arrived. If false, then fire() returns without waiting, using the defaultOutput parameter in place of real data. The returnAddress and returnSocketNumber ports have default outputs as well, but they are not parameter-programmable.

    NOTE: This actor has a parameter localSocketNumber for the port number assigned to its local datagram socket. Initially, the local socket number is set to 4004. There is no particular reason for choosing this number, except that is noticeable in the code and in Vergil, thus encouraging you to change it to any desired value in the range 0..65535. Note that socket numbers 0..1023 are generally reserved and numbers 1024 and above are generally available.

    Some commonly used port numbers (a.k.a. socket numbers) are shown below:

     Well-known Ports
     (Commonly Used Ports)
     7        (Echo)
     21        (FTP)
     23        (TELNET)
     25        (SMTP)
     53        (DNS)
     79        (finger)
     80        (HTTP)
     110        (POP3)
     119        (NNTP)
     161        (SNMP)
     162        (SNMP Trap)
     
    Reference: http://192.168.1.1/Forward.htm (A webpage hosted from within the Linksys BEFSR41 Cable/DSL Router)

    NOTE: This actor can also be configured to handle multicase datagram socket. A MulticastSocket is a DatagramSocket with additional capabilities to join groups of other multicast hosts on the internet. A multicast group is specified by a class D IP address and a standard UDP port number. When one member sends a packet to a multicast group, all recipients subscribing to that host and port receive the packet. Currently, The parameter defaultReturnAddress is overloaded to specify a multicast datagram IP address. When the return address is a multicast IP address, The parameter localSocketNumber is used to specify the UDP port number for the multicast group. A multicast IP address ranges from 224.0.0.0 to 239.255.255.255, inclusive. To send a packet to the group, the sender can be either a DatagramSocket or a MulticastSocket. The only difference is that MulticastSocket allows you to control the time-to-live of the datagram. Don't use 224.0.0.1 ~ 224.255.255.255 when the live time of is specified larger than 1.

    FIXME: we might not want to overload the defaultReturnAddress and the localSocketNumber parameter...

    Another useful tidbit is the command 'netstat'. This works in a DOS prompt and also in the UNIX-like Bash shell. In either shell, enter 'netstat -an'. This command shows current port allocations! Ports allocated to Ptolemy models are shown along with other port allocations. Other useful network commands include 'ping' and 'tracert'. Both TCP and UDP (datagram) ports are shown by netstat. FIXME: Find out whether a TCP port using a specific number blocks a UDP port from using that same number.

    Since:
    Ptolemy II 2.0
    Version:
    $Id$
    Author:
    Winthrop Williams, Joern Janneck, Xiaojun Liu, Edward Lee (Based on TiltSensor actor written by Chamberlain Fong, Xiaojun Liu, Edward Lee)
    Pt.AcceptedRating:
    Yellow (winthrop)
    Pt.ProposedRating:
    Yellow (winthrop)
    • Field Detail

      • returnAddress

        public TypedIOPort returnAddress
        This port outputs the IP address portion of the received datagram packet. The type of this output is String. This is the IP address of the remote datagram socket which sent the packet to the socket of this actor. Under IPv4, this string has the familiar form "128.32.1.1". This output defaults (when no datagram has been received and blocking is false) to the IP address of the socket.
      • returnSocketNumber

        public TypedIOPort returnSocketNumber
        This port outputs the socket (a.k.a port) number portion of the received datagram packet. The type of this output is int. This is the socket number of the remote datagram socket which sent the packet to this actor's socket. This is an integer in the range 0 through 65535. This output defaults (when no datagram has been received and blocking is false) to this actor's local socket number.
      • output

        public TypedIOPort output
        This port outputs the data portion of the received datagram packet. The type of output may depend on the datagram received, which may vary even during a single run of a model. The user is encouraged to play with the configuration of this port to best suit the need at hand.
      • trigger

        public TypedIOPort trigger
        The trigger input port reads and discards a token from each channel that has a token. The type of this port has been set to GENERAL, permitting any token type to be accepted. The hasToken() and get(int) methods are called on this input, but their contents are discarded. The presence of a connection to this input serves a purpose by causing the director to schedule the firing of this actor at an appropriate place in the sequence of firings of actors. This is particularly useful in the SDF domain. Without a trigger input, the SDF scheduler would be unable to schedule a firing of this actor unless it can be scheduled as the first actor to be fired. Thus, without this input, configurations in SDF would be limited. (@See ptolemy.actor.lib.Source for an archetypal trigger input.)
      • localSocketNumber

        public Parameter localSocketNumber
        This actor's local socket (a.k.a. port) number. This is a system resource allocated to this actor. No other actor with the same local socket number may run at the same time. Currently, When the return address is a multicast IP address, this parameter is also used to specify the UDP port number for the multicast group.
      • actorBufferLength

        public Parameter actorBufferLength
        Length (in bytes) of each of the actor's two packet buffers for receiving a datagram. This length does not include the bytes needed for storing the datagram's return address and other housekeeping information. This buffer need only be big enough to hold the payload (a.k.a. data portion) of the datagram.
      • platformBufferLength

        public Parameter platformBufferLength
        Length (in bytes) of the buffer within java and/or the platform layers below java. Java documents refers to all this collectively as the platform. The size of this buffer is controlled via the getReceiveBufferSize() and setReceiveBufferSize() methods. @see java.net.DatagramSocket. Caution #1 - The platform treats setReceiveBufferSize() as a suggestion only. It supposedly reports the actual buffer size granted in subsequent calls to getReceiveBufferSize(). However, my experiments with this showed it granting buffers as large as 2 gigabytes, with no apparent limit except the maximum representable integer value. Thus, I suggest taking this with a grain of salt. Caution #2 - the get/setReceiveBufferSize() calls block when called as long as another thread is in a receive() call on that same socket. This is undocumented in Java's documentation. Also note that the setReceiveBufferSize() method is not available in early JDK's, which makes it important to have setPlatformBufferLength set to false when generating code.
      • setPlatformBufferLength

        public Parameter setPlatformBufferLength
        Determine whether the platformBufferLength parameter will be used to set the platform's receive buffer size. This parameter must contain a boolean token, and has a default of false.
      • overwrite

        public Parameter overwrite
        Whether to overwrite when inundated with datagrams or let them pile up. Default is true. If false, datagrams will queue up (mostly in the platform, some in the actor). The datagram used at each invocation of fire will be the oldest in the queue. On the other hand, if overwrite is true, then minimal queuing will occur and the most recent data will be used when fire() is called. Older data will be discarded.
      • blockAwaitingDatagram

        public Parameter blockAwaitingDatagram
        Whether to block in fire(). If fire() is called before the datagram has arrived, the actor must either block awaiting the datagram or use its defaultOutput. This blocking parameter controls which choice fire() will make. This parameter is useful for SDF models, where it is generally set to true. It has no effect in DE models unless the trigger input has been connected. Type is Boolean. Default value is true.
      • defaultReturnAddress

        public Parameter defaultReturnAddress
        The default for the returnAddress output. This token is broadcast when the actor is fired, but no actual datagram is available to broadcast and blockAwaitingDatagram is false. If blocking were true, the actor would simply stall in fire() until a datagram arrives. Type is string. Default value is "localhost". Currently, this parameter can be overloaded to specify a multicast datagram IP address. A multicast IP address ranges from 224.0.0.0 to 239.255.255.255, inclusive.
      • defaultReturnSocketNumber

        public Parameter defaultReturnSocketNumber
        The default the returnSocketNumber output. This token is broadcast when the actor is fired, but no actual datagram is available to broadcast and blockAwaitingDatagram is false. If blocking were true, the actor would simply stall in fire() until a datagram arrives. Type is integer. Default value is 0.
      • defaultOutput

        public Parameter defaultOutput
        The default for the output output. This default token is broadcast when the actor is fired, but no actual datagram data is available to broadcast and blockAwaitingDatagram is false. If blocking were true, the actor would simply stall in fire() until a datagram arrives. Type is defined by the expression entered. Default type and value is the integer 0.
    • Method Detail

      • attributeChanged

        public void attributeChanged​(Attribute attribute)
                              throws IllegalActionException
        React to a change of the given attribute. Generally, this is method called between firings of an actor. However, this actor contains a separate thread of which the director is not aware. Thus, any time the model is running, calls to this method typically happen while the thread is running. Furthermore, the thread spends most of its time blocked in the DatagramSocket.receive() method or (if overwrite is false) waiting for fire() to notify it that space is available to receive another packet. This method has been architected to, when possible, permit prompt changes (such as to the socket number being monitored for incoming datagrams), while at the same time maintaining consistency.
        Overrides:
        attributeChanged in class NamedObj
        Parameters:
        attribute - The attribute that changed.
        Throws:
        IllegalActionException - If the change is not allowed.
      • fire

        public void fire()
                  throws IllegalActionException
        Broadcast a received datagram, or block awaiting one, or broadcast default values. Broadcast a return address and a return socket number along with the net data payload contents of the datagram. Conversion of the payload data into a token is handled in a variety of ways, according to the setting of the encoding parameter. The return address and return socket number are always broadcast as a string and an integer respectively.
        Specified by:
        fire in interface Executable
        Overrides:
        fire in class AtomicActor<TypedIOPort>
        Throws:
        IllegalActionException - If the data cannot be converted into a token of the same type as the configured type of the output port.
      • initialize

        public void initialize()
                        throws IllegalActionException
        Initialize this actor, including the creation of an evaluation variable for the Ptolemy parser, a DatagramSocket for receiving datagrams, and a SocketReadingThread for blocking in the DatagramSocket.receive() method call. This method is used as a bookend with wrapup() being the other end. Resources created/allocated here are released in wrapup().
        Specified by:
        initialize in interface Initializable
        Overrides:
        initialize in class AtomicActor<TypedIOPort>
        Throws:
        IllegalActionException - If the localSocketNumber parameter has a value outside 0..65535 or a socket could not be created.
      • stopFire

        public void stopFire()
        Stop the fire() method, but only if it is blocked. The actor returns from fire(), with its state the same as before fire() was called. Thus, when the manager is ready, it may call fire() again and the actor will start again in a consistent state. The fire() method uses a wait() call to block. If fire() not currently blocked, it is permitted to continue. This could be before or after the wait() call. If it has passed the wait() statement, it will complete. However, if it has not yet reached the wait() statement, it will block anyway. Thus, when pausing or stopping execution, it will on rare occasion be necessary to press 'pause' or 'stop' a second time. I tried to clean this up, so that one, and exactly one, stopFire() call would suffice. These experiments and some thought about them are discussed below. However, the bottom line is that any approach which stopped fire() in each case where it ought to be stopped, also stopped it in some additional cases where it ought not be stopped. This is a very interesting predicament! One could say the problem in an incompatibility of system-level types. The type provided to the actor collectively by the Manager, CompositeActor, and Director is a call-return type of interface. That is, stopFire() is called, and it is expected to return. However, what may be needed is an interrupt type of interface. One difference is that an interrupt comes with a flag that can be tested via the interrupted() and isInterrupted() methods. Such a flag would solve the immediate problem with stopFire(). However, I wonder how we could be habitually applying the theory of system-level types and/or interface automata theory here and elsewhere? Could we be building clearer designs from the get go? Could the designs self modify or type-check themselves to maintain consistency as changes are made? I am intrigued by the possibilities! I did a bit of nosing around and discovered how stopFire() gets called. The Manager initiates the call on the CompositeActor. It then calls stopFire() on the Director. The Director fans out the call to every actor below it. The director/manager insists on calling fire() after every call of prefire(). Even when it has issued a stopFire() during prefire(), it persists, executing the very fire() it is trying to stop! This behavior makes it impossible for the actor to cover all the cases where it ought to stop. See detailed discussion below. Fortunately, multiple presses of the 'pause' and 'stop' buttons result in multiple stopFire() calls. By the time a user can click the mouse a second time, fire() will have blocked if it is going to do so. It can then be stopped as expected. FIXME: There exists a circumstance where stopFire() could fail to stop the fire() method, and the fire() method could block indefinitely. This occurs if stopFire() is called after fire() is called (or after the director commits itself to calling fire()) but before fire() enters the synchronized section in which it blocks with a wait() call. In theory, fire() should avoid this by testing a flag before waiting. Such a flag could be set by stopFire(). However, there is no way for the actor to tell whether the flag was set during (or just before) this firing (in which case it ought to be obeyed), or if it was set during the completion of the last firing or when the actor was not being fired at all (in which case it ought to be ignored). What is needed is a flag which is set when stopFire() is called, and cleared before the director commits to calling fire(). Perhaps prefire() could serve to clear the flag! This assumes that, upon resuming execution, prefire() is repeated before fire() is reentered.-[Tested; assumption holds.] It also assumes that superfluous calls to stopFire() do not occur during prefire() or between prefire() and fire().-[Tested; assumption holds with only known superfluous call, the one when the user presses 'Go'] Additionally, this assumes that intentional calls to stopFire(), if they occur before or during the [pre]initialize call, are backed up by a test which prevents the director from calling fire() if it has already called stopFire() with the intention of stopping that same fire().-[Tested; assumption does not hold. I caused stopFire() to be called during prefire(). The director went ahead and called fire() anyway!] If the director/manager need to be fixed anyway, perhaps the stopFire paradigm ought to be rearchitected to incorporate a flag. (Does it already and I just don't know about it?)
        Specified by:
        stopFire in interface Executable
        Overrides:
        stopFire in class AtomicActor<TypedIOPort>
      • stop

        public void stop()
        Request that execution of the current iteration stop as soon as possible. Wake up the manager thread if it is blocking on fire() of this actor.
        Specified by:
        stop in interface Executable
        Overrides:
        stop in class AtomicActor<TypedIOPort>
      • wrapup

        public void wrapup()
                    throws IllegalActionException
        Release resources acquired in the initialize() method, specifically the evaluation variable, the DatagramSocket, and the SocketReadingThread. This method also gets called from this actor's setContainer() method. This insures that when the actor is removed from a running simulation, locked resources are released. Since the thread blocks in receive() on the DatagramSocket, and since this method does not respond to an interrupt() call on its thread, the close() method is used on the DatagramSocket to break the thread out of the receive() call. Because attributeChanged() also (temporarily) closes the DatagramSocket, wrapup() additionally makes the DatagramSocket null. This signals the thread to terminate.
        Specified by:
        wrapup in interface Initializable
        Overrides:
        wrapup in class AtomicActor<TypedIOPort>
        Throws:
        IllegalActionException - Not thrown in this base class.