Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How would i send messages to an exchange? #21

Open
danammeansbear opened this issue Oct 20, 2022 · 2 comments
Open

How would i send messages to an exchange? #21

danammeansbear opened this issue Oct 20, 2022 · 2 comments

Comments

@danammeansbear
Copy link

I basically want to be able to send messages to the queue or exchange that goes into moving the cubes.

I have the following code written for a c# console app but how would i convert this to your unity code?

  foreach (string greenhouselist in Greenhouse.GreenhouseList)
                {
                    channel.QueueDeclare($"{greenhouselist}Queue",
                durable: true,
                exclusive: false,
                autoDelete: false,
                arguments: null);

                    //channel.ExchangeDeclare("amq.topic", ExchangeType.Fanout);


                    var Count = 0;

                    foreach (string Sensor in Sensor.SensorList)
                    {
                        //var message = new { Name = "Plant Event", Location = greenhouselist, Sensor = Sensor, Saverity = "high", Message = $"Hello! Water me!! Count:{Count}" };
                        //var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message));

                        //// string query = "INSERT INTO dbo.greenhouse.message (Name,Price,Date) VALUES('LED Screen','$120','27 January 2017')";
                        ////SqlCommand cmd = new SqlCommand(query, con);
                        //channel.BasicPublish("", $"{greenhouselist}Queue", null, body);
                        foreach (string cube in cube.cubeList)
                        {
                            Random random = new Random();
                            //{ "id":"cube2" "rotY":45 }
                            //{ "id":"cube2", "rotY":45 }
                            //[{ "id":"cube1", "rotX":45 }, { "id":"cube2", "rotY":45 }, { "id":"cube3", "rotZ":45 }]
                            var message1 = @"{ ""id"":""cube2"", ""rotY"":45 }";
                            var message = @"[{ ""id"":""cube2"", ""rotY"":45 }, { ""id"":""cube1"", ""rotX"":90, ""rotY"":23 }, { ""id"":""cube3"", ""rotZ"":29 }]";
                            var message2 = System.Text.RegularExpressions.Regex.Replace(message, "[@\\.;'\\\\]", "");
                            //var message2 = message.Replace(@"\", string.Empty);
                            //var message =  new { id = "Plant Event", rot = greenhouselist, Sensor = Sensor};
                            var body2 = Encoding.UTF8.GetBytes(message2);
                            
                            channel.BasicPublish(exchange: "amq.topic",
                                                 routingKey: "amqpdemo.objects",
                                                 basicProperties: null,
                                                 body: body2);
                     
                            Count++;
                            Thread.Sleep(1000);
                        }
                        Count++;
                        Thread.Sleep(1000);
                    }
       

    }
            }
@danammeansbear
Copy link
Author

`using System.Collections.Generic;
using UnityEngine;
using UnityEngine.UI;
using CymaticLabs.Unity3D.Amqp.SimpleJSON;

namespace CymaticLabs.Unity3D.Amqp.UI
{
///


/// Performs UI logic for the demo AMQP connection form.
///

public class SensorScript : MonoBehaviour
// public class AmqpConnectionForm : MonoBehaviour
{

    [Tooltip("The name of the exchange to subscribe to.")]
    public string ExchangeName;

    [Tooltip("The exchange type for the exchange being subscribed to. It is important to get this value correct as the RabbitMQ client will close a connection if you pass the wrong type for an already declared exchange.")]
    public AmqpExchangeTypes ExchangeType = AmqpExchangeTypes.Topic;

    [Tooltip("The optional routing key to use when subscribing to the exchange. This mostly applies to 'topic' exchanges.")]
    public string RoutingKey;
    [Tooltip("When enabled received messages will be logged to the debug console.")]
    public bool DebugLogMessages = false;
    #region Fields

    // Internal look-up table of object references given their AMQP ID
    private Dictionary<string, AmqpObjectControlReference> objectsById;

    #endregion Fields

    #region Properties

    /// <summary>
    /// Gets the static, singleton instance of the behaviour.
    /// </summary>
    public static SensorScript Instance { get; private set; }

    #endregion Properties
    #region Methods
    private void Awake()
    {
        // Set static instance
        Instance = this;

        // Initialize the object/id look-up table
        objectsById = new Dictionary<string, AmqpObjectControlReference>();
    }

    // *Note*: Only interact with the AMQP library in Start(), not Awake() 
    // since the AmqpClient initializes itself in Awake() and won't be ready yet.
    private void Start()
    {
        // Create a new exchange subscription using the inspector values
        var subscription = new AmqpExchangeSubscription(ExchangeName, ExchangeType, RoutingKey, HandleExchangeMessageReceived);

        /*
         * Add the subscription to the client. If you are using multiple AmqpClient instances then
         * using the static methods won't work. In that case add a inspector property of type 'AmqpClient'
         * and assigned a reference to the connection you want to work with and call the 'SubscribeToExchange()'
         * non-static method instead.
         */
        AmqpClient.Subscribe(subscription);
    }
    private void Update()
    {
        Publish();
    }
    public void Publish()
    {
        // Validate args
        var isValid = true;

        //var exchangeName = PublishExchange.options[PublishExchange.value].text;
        var exchangeName = "amq.topic";

        if (string.IsNullOrEmpty(exchangeName))
        {
            isValid = false;
            AmqpConsole.Color = Color.red;
            AmqpConsole.WriteLine("* Exchange Name cannot be blank");
            AmqpConsole.Color = null;
        }

        var message = "insert some sort of message here probably json or xml";

        if (string.IsNullOrEmpty(message))
        {
            isValid = false;
            AmqpConsole.Color = Color.red;
            AmqpConsole.WriteLine("* Message cannot be blank");
            AmqpConsole.Color = null;
        }

        // Don't continue if values are invald
        if (!isValid) return;

        var exchangeType = AmqpExchangeTypes.Direct;

        // Find this exchange and get its exchange type
        foreach (var exchange in exchanges)
        {
            if (exchange.Name == exchangeName)
            {
                exchangeType = exchange.Type;
                break;
            }
        }

        var routingKey = "amqpdemo.objects";

        // Publish the message
        AmqpClient.Publish(exchangeName, routingKey, message);
        //PublishMessage.text = null; // clear out message

        // Refocus the message area
        //PublishMessage.Select();
        //PublishMessage.ActivateInputField();
    }
    /// <summary>
    /// Registers a new AMQP object control reference with the controller.
    /// </summary>
    /// <param name="objRef">The object control reference to register.</param>
    public void RegisterObject(AmqpObjectControlReference objRef)
    {
        if (objRef == null) throw new System.ArgumentNullException("objRef");

        // Ensure this reference has been filled out properly
        if (string.IsNullOrEmpty(objRef.AmqpId))
        {
            Debug.LogWarningFormat("AMQP Control Object Reference is missing its ID: {0}", objRef.name);
            return;
        }

        // Add new
        if (!objectsById.ContainsKey(objRef.AmqpId))
        {
            objectsById.Add(objRef.AmqpId, objRef);
        }
        // Replace, but warn
        else
        {
            Debug.LogWarningFormat("AMQP Control Object Reference with ID has already been registered: {0}", objRef.AmqpId);
            objectsById[objRef.AmqpId] = objRef;
        }

        if (DebugLogMessages)
            Debug.LogFormat("AMQP Control Object registered with ID {0} => {1}", objRef.AmqpId, objRef.name);
    }

    /// <summary>
    /// unregisters an existing AMQP object control reference from the controller.
    /// </summary>
    /// <param name="objRef">The object control reference to unregister.</param>
    public void UnregisterObject(AmqpObjectControlReference objRef)
    {
        if (objRef == null) throw new System.ArgumentNullException("objRef");

        // Ensure this reference has been filled out properly
        if (string.IsNullOrEmpty(objRef.AmqpId))
        {
            Debug.LogWarningFormat("AMQP Control Object Reference is missing its ID: {0}", objRef.name);
            return;
        }

        if (objectsById.ContainsKey(objRef.AmqpId))
        {
            objectsById.Remove(objRef.AmqpId);
            if (DebugLogMessages) Debug.LogFormat("AMQP Control Object Reference unregistere {0}", objRef.AmqpId);
        }
    }

    /**
     * Handles messages receieved from this object's subscription based on the exchange name,
     * exchange type, and routing key used. You could also write an anonymous delegate in line
     * when creating the subscription like: (received) => { Debug.Log(received.Message.Body.Length); }
     */
    void HandleExchangeMessageReceived(AmqpExchangeReceivedMessage received)
    {
        // First convert the message's body, which is a byte array, into a string for parsing the JSON
        var receivedJson = System.Text.Encoding.UTF8.GetString(received.Message.Body);

        Debug.Log(receivedJson);

        /**
         *  Parse the JSON message
         *  This example uses the SimpleJSON parser which is included in the AMQP library.
         *  You can find out more about this parser here: http://wiki.unity3d.com/index.php/SimpleJSON
        */

        // If this starts with a bracket, it's an array of messages, so decode separately
        if (receivedJson.StartsWith("["))
        {
            var msgList = JSON.Parse(receivedJson).AsArray;

            for (var i = 0; i < msgList.Count; i++)
            {
                var msg = msgList[i];
                UpdateObject(msg);
            }
        }

        // Otherwise it's an individual message so decode individually
        else
        {
            var msg = JSON.Parse(receivedJson);
            UpdateObject(msg);
        }
    }

    // Updates an object in the list with the given update message
    void UpdateObject(JSONNode msg)
    {
        // Get the message ID filter, if any
        var id = msg != null ? msg["id"].Value : null;

        if (string.IsNullOrEmpty(id))
        {
            if (DebugLogMessages) Debug.LogWarning("AMQP message received without 'id' property.");
            return;
        }

        // Get the object given its message ID
        if (!objectsById.ContainsKey(id))
        {
            if (DebugLogMessages) Debug.LogWarningFormat("No AMQP Object Control Reference found for ID: {0}.", id);
            return;
        }

        // Get the object reference for this ID
        var objRef = objectsById[id];

        if (UpdatePosition)
        {
            // If the property exists use its value, otherwise just use the current value
            var objPos = UpdateInWorldSpace ? objRef.transform.position : objRef.transform.localPosition;
            var posX = msg["posX"] != null ? msg["posX"].AsFloat : objPos.x;
            var posY = msg["posY"] != null ? msg["posY"].AsFloat : objPos.y;
            var posZ = msg["posZ"] != null ? msg["posZ"].AsFloat : objPos.z;

            // Update with new values
            if (UpdateInWorldSpace)
            {
                objRef.transform.position = new Vector3(posX, posY, posZ);
            }
            else
            {
                objRef.transform.localPosition = new Vector3(posX, posY, posZ);
            }
        }

        if (UpdateRotation)
        {
            // If the property exists use its value, otherwise just use the current value
            var objRot = UpdateInWorldSpace ? objRef.transform.eulerAngles : objRef.transform.localEulerAngles;
            var rotX = msg["rotX"] != null ? msg["rotX"].AsFloat : objRot.x;
            var rotY = msg["rotY"] != null ? msg["rotY"].AsFloat : objRot.y;
            var rotZ = msg["rotZ"] != null ? msg["rotZ"].AsFloat : objRot.z;

            // Update with new values
            if (UpdateInWorldSpace)
            {
                objRef.transform.eulerAngles = new Vector3(rotX, rotY, rotZ);
            }
            else
            {
                objRef.transform.localEulerAngles = new Vector3(rotX, rotY, rotZ);
            }
        }

        if (UpdateScale)
        {
            // If the property exists use its value, otherwise just use the current value
            var scaleX = msg["sclX"] != null ? msg["sclX"].AsFloat : objRef.transform.localScale.x;
            var scaleY = msg["sclY"] != null ? msg["sclY"].AsFloat : objRef.transform.localScale.y;
            var scaleZ = msg["sclZ"] != null ? msg["sclZ"].AsFloat : objRef.transform.localScale.z;

            // Update with new values
            objRef.transform.localScale = new Vector3(scaleX, scaleY, scaleZ);
        }
    }


    /// <summary>
    /// Publishes a message to the current exchange using the form's input values.
    /// </summary>
   

  

    #region Event Handlers

    // Handles a connection event
    void HandleConnected(AmqpClient client)
    {
        //Connection.interactable = false;
        //ConnectButton.interactable = false;
        //DisconnectButton.interactable = true;

        // Query exchange list
        AmqpClient.GetExchangesAsync(FinishConnected);
    }

    // Finishes the connection event by receiving the async result of the exchange query and display the results in the drop down
    void FinishConnected(AmqpExchange[] exchangeList)
    {
        // Copy list locally
        exchanges = exchangeList;

        //foreach (var exchange in exchanges)
        //{
        //    if (exchange.Name == null || exchange.Name == "/") continue;
        //    var option = new Dropdown.OptionData(exchange.Name);
        //    ExchangeName.options.Add(option);
        //    PublishExchange.options.Add(option);
        //}

        //if (exchanges.Length > 0)
        //{
        //    ExchangeName.RefreshShownValue();
        //    PublishExchange.RefreshShownValue();
        //}

        // Enable UI
        //ExchangeName.interactable = true;
        //RoutingKey.interactable = true;
        //SubscribeButton.interactable = true;
        //UnsubscribeButton.interactable = true;

        //PublishButton.interactable = true;
        //PublishExchange.interactable = true;
        //PublishMessage.interactable = true;
        //PublishRoutingKey.interactable = true;
    }

    // Handles a disconnection event
    void HandleDisconnected(AmqpClient client)
    {
        //Connection.interactable = true;
        //ConnectButton.interactable = true;
        //DisconnectButton.interactable = false;

        //ExchangeName.interactable = false;
        //RoutingKey.interactable = false;
        //SubscribeButton.interactable = false;
        //UnsubscribeButton.interactable = false;

        //PublishButton.interactable = false;
        //PublishExchange.interactable = false;
        //PublishMessage.interactable = false;
        //PublishRoutingKey.interactable = false;
    }

    // Handles a reconnecting event
    void HandleReconnecting(AmqpClient client)
    {

    }

    // Handles a blocked event
    void HandleBlocked(AmqpClient client)
    {

    }

    // Handles exchange subscribes
    void HandleExchangeSubscribed(AmqpExchangeSubscription subscription)
    {
        // Add it to the local list
        exSubscriptions.Add(subscription);
    }

    // Handles exchange unsubscribes
    void HandleExchangeUnsubscribed(AmqpExchangeSubscription subscription)
    {
        // Add it to the local list
        exSubscriptions.Remove(subscription);
    }

    #endregion Event Handlers

    #endregion Methods
}

}
`

@danammeansbear
Copy link
Author

so
I imagine you just use the AMQP publish method but I feel like I am missing something. I am an agriculture major so coding isnt exactly my fortay but i try.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant