Skip to content

Commit

Permalink
Misc: internal state endpoint, bug fixes for remote start transaction…
Browse files Browse the repository at this point in the history
…, and QoL changes
  • Loading branch information
Erik Siebert committed Oct 14, 2024
1 parent 62ec49b commit 48266b5
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 48 deletions.
130 changes: 84 additions & 46 deletions fake_charging_station/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,24 +57,36 @@ async def rejected_request_exception_handler(
return JSONResponse(status_code=status.HTTP_409_CONFLICT, content={"detail": exc.message})


@app.get("/fcs/connector/{connector_id}/status", status_code=status.HTTP_200_OK)
async def send_status(connector_id: int = 1) -> dict[str, str]:
@app.post(
"/fcs/connector/{connector_id}/status",
status_code=status.HTTP_204_NO_CONTENT,
tags=["connector"],
)
async def send_status_notification_to_csms(connector_id: int = 1):
"""Send status notification to CSMS.
**connector_id**: The connector id for which the status notification will be sent
\f Truncate output for OpenAPI doc
Args:
connector_id: The ID of the connector
"""
await app.FCS.send_status_notification(connector_id=connector_id) # type: ignore[attr-defined]
return {"message": "Sending status"}


@app.get("/fcs/connector/{connector_id}/plugin", status_code=status.HTTP_200_OK)
async def plugin(connector_id: int = 1, rfid: str | None = None) -> dict[str, str]:
@app.post(
"/fcs/connector/{connector_id}/plugin",
status_code=status.HTTP_204_NO_CONTENT,
tags=["connector"],
)
async def plugin_connector(connector_id: int = 1, rfid: str | None = None):
"""Plug in a connector, and authenticate if RFID is given.
Make sure the RFID is a know driver in the CSMS.
**connector_id**: The connector id to be plugged in
**rfid**: If given, this RFID will be used to authenticate against the CSMS.
Make sure the RFID is a know driver in the CSMS.
\f Truncate output for OpenAPI doc
Expand All @@ -84,14 +96,20 @@ async def plugin(connector_id: int = 1, rfid: str | None = None) -> dict[str, st
If given, tries to authenticate and starts a charging session
"""
await app.FCS.plug_in(rfid=rfid, connector_id=connector_id) # type: ignore[attr-defined]
return {"message": "Plugging in"}


@app.get("/fcs/connector/{connector_id}/start", status_code=status.HTTP_200_OK)
async def start(connector_id: int = 1, rfid: str = "12341234") -> dict[str, str]:
@app.post(
"/fcs/connector/{connector_id}/start",
status_code=status.HTTP_204_NO_CONTENT,
tags=["connector"],
)
async def start_transaction(connector_id: int = 1, rfid: str = "12341234"):
"""Authenticates and starts transaction at a connector.
Make sure the RFID is a know driver in the CSMS.
**connector_id**: The connector id in which the transaction will start
**rfid**: The RFID used to authenticate against the CSMS.
Make sure the RFID is a know driver in the CSMS.
\f Truncate output for OpenAPI doc
Expand All @@ -100,18 +118,23 @@ async def start(connector_id: int = 1, rfid: str = "12341234") -> dict[str, str]
rfid: RFID used to authenticate against the CSMS
"""
await app.FCS.send_auth_start(connector_id=connector_id, rfid=rfid) # type: ignore[attr-defined]
return {"message": "Authenticating and starting transaction"}


@app.get("/fcs/connector/{connector_id}/send_charging_profile", status_code=status.HTTP_200_OK)
async def send_charging_profile(connector_id: int = 1, limit: int = 100) -> dict[str, str]:
@app.post(
"/fcs/connector/{connector_id}/set_charging_profile",
status_code=status.HTTP_204_NO_CONTENT,
tags=["connector"],
)
async def set_charging_profile(connector_id: int = 1, limit: int = 100):
"""Send a charging profile with a limit in W.
Limits have different effects on the EV's behavior:
- Limit > 0 -> EV will charge with the amount of W every meter value interval
- Limit == 0 -> EV will stop charging and SuspendedEVSE will be reported
- Limit == -1 -> EV will stop charging and SuspendedEV will be reported
- Limit == -2 -> EV will stop charging and Finishing will be reported
**connector_id**: The connector id to which the charging profile will be set
**limit**: Limit of the charging profile in W. Different values have different behaviors:
- Positive limits -> EV will charge with the amount of W every meter value interval
- Zero limit -> EV will stop charging and SuspendedEVSE will be reported
- Limit -1 -> EV will stop charging and SuspendedEV will be reported
- Limit -2 -> EV will stop charging and Finishing will be reported
\f Truncate output for OpenAPI doc
Expand All @@ -133,27 +156,19 @@ async def send_charging_profile(connector_id: int = 1, limit: int = 100) -> dict
)

await app.FCS.after_set_charging_profile(connector_id=connector_id) # type: ignore[attr-defined]
return {"message": "Charging profile sent"}


@app.post("/fcs/data_transfer", status_code=status.HTTP_200_OK)
async def send_data_transfer(payload: dict[str, object] = {}) -> dict[str, str]:
"""Send a DataTransfer payload to the CSMS.
\f Truncate output for OpenAPI doc
Args:
payload: Payload to be sent in a DataTransfer request
"""
await app.FCS.send_data_transfer(payload=payload) # type: ignore[attr-defined]
return {"message": "Sending data transfer payload"}


@app.get("/fcs/connector/{connector_id}/stop", status_code=status.HTTP_200_OK)
async def stop(connector_id: int = 1, reason: str | None = None) -> dict[str, str]:
@app.post(
"/fcs/connector/{connector_id}/stop",
status_code=status.HTTP_204_NO_CONTENT,
tags=["connector"],
)
async def stop_transaction(connector_id: int = 1, reason: str | None = None):
"""Stop transaction at a connector.
In order to also unplug the EV, pass `reason=EVDisconnected`
**connector_id**: The connector id in which the transaction will stop
**reason**: If `reason=EVDisconnected`, also unplugs the EV
\f Truncate output for OpenAPI doc
Expand All @@ -162,14 +177,20 @@ async def stop(connector_id: int = 1, reason: str | None = None) -> dict[str, st
reason: Reason for stopping the transaction. Defaults to None.
"""
await app.FCS.send_stop_transaction(connector_id=connector_id, reason=reason) # type: ignore[attr-defined]
return {"message": "Stopping transaction"}


@app.get("/fcs/connector/{connector_id}/unplug", status_code=status.HTTP_200_OK)
async def unplug(connector_id: int = 1, stop_tx: bool = True) -> dict[str, str]:
@app.post(
"/fcs/connector/{connector_id}/unplug",
status_code=status.HTTP_204_NO_CONTENT,
tags=["connector"],
)
async def unplug_connector(connector_id: int = 1, stop_tx: bool = True):
"""Unplug a connector.
Also sends a StopTransaction if it wasn't remotely stopped.
**connector_id**: The connector id to be plugged out
**stop_tx**: Whether to send a StopTransaction after unplugging connector.
Can be set to `False` to simulate chargers that send this message later.
\f Truncate output for OpenAPI doc
Expand All @@ -179,18 +200,35 @@ async def unplug(connector_id: int = 1, stop_tx: bool = True) -> dict[str, str]:
Defaults to True.
"""
await app.FCS.unplug(connector_id=connector_id, stop_tx=stop_tx) # type: ignore[attr-defined]
return {"message": "Unplugging"}


@app.get("/fcs/disc", status_code=status.HTTP_200_OK)
async def disconnect() -> dict[str, str]:
@app.post("/fcs/data_transfer", status_code=status.HTTP_204_NO_CONTENT, tags=["charging_station"])
async def send_data_transfer(payload: dict[str, object] = {}):
"""Send a DataTransfer payload to the CSMS.
**payload**: Payload to be sent in a DataTransfer request
\f Truncate output for OpenAPI doc
Args:
payload: Payload to be sent in a DataTransfer request
"""
await app.FCS.send_data_transfer(payload=payload) # type: ignore[attr-defined]


@app.get("/fcs/internal_state", status_code=status.HTTP_200_OK, tags=["charging_station"])
async def get_internal_state():
"""Get the internal state of the fake charging station, useful for debugging."""
return {"state": app.FCS.to_dict()} # type: ignore[attr-defined]


@app.post("/fcs/disc", status_code=status.HTTP_204_NO_CONTENT, tags=["charging_station"])
async def disconnect_from_csms():
"""Disconnect the FCS from the CSMS."""
await app.FCS.disconnect() # type: ignore[attr-defined]
return {"message": "Disconnecting"}


@app.get("/fcs/shutdown", status_code=status.HTTP_200_OK)
async def shutdown() -> dict[str, str]:
@app.post("/fcs/shutdown", status_code=status.HTTP_204_NO_CONTENT, tags=["instance"])
async def shutdown_instance():
"""Shutdown the FCS instance."""
os.kill(os.getpid(), signal.SIGTERM)
return {"message": "Shutting down"}
27 changes: 27 additions & 0 deletions fake_charging_station/fcs_v16/fcs_v16.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ async def on_remote_start_transaction(
if connector_id is None:
return call_result.RemoteStartTransactionPayload(status=RemoteStartStopStatus.rejected)
self.connectors[connector_id].id_tag = id_tag
self.connectors[connector_id].plugged_in = True
return call_result.RemoteStartTransactionPayload(status=RemoteStartStopStatus.accepted)

@after(Action.RemoteStartTransaction)
Expand Down Expand Up @@ -426,6 +427,32 @@ async def send_data_transfer(self, payload: dict[str, object] = {}):
LOGGER.debug("Sending DataTransfer")
return await self.call(request)

def to_dict(self) -> dict[str, object]:
"""Returns self in dictionary format."""
return {
"id": self.id,
"configuration": self.configuration,
"transaction_connector": self.transaction_connector,
"connected": self.connected,
"tx_start_charge": self.tx_start_charge,
"connectors": [
{
"connector_id": c.id,
"change_to_unavailable": c.change_to_unavailable,
"pending_stop_tx": c.pending_stop_tx,
"id_tag": c.id_tag,
"transaction_id": c.transaction_id,
"energy_import_register": c.energy_import_register,
"power_offered": c.power_offered,
"error_code": c.error_code,
"plugged_in": c.plugged_in,
"already_stopped": c.already_stopped,
"status": c.status,
}
for c in self.connectors.values()
],
}


async def get_fcs(settings: Settings) -> FakeChargingStation:
"""Build a FCS instance."""
Expand Down
4 changes: 2 additions & 2 deletions tests/test_endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@


async def test_endpoint(async_client: AsyncClient) -> None:
response = await async_client.get("/fcs/connector/1/status")
response = await async_client.post("/fcs/connector/1/status")

assert response.status_code == status.HTTP_200_OK
assert response.status_code == status.HTTP_204_NO_CONTENT

0 comments on commit 48266b5

Please sign in to comment.