Test-driven data engineering
A perspective on how to build data integrations and clients in a TDD fashion
After working as a Data Engineer for over a year, I’ve struggled with incorporating best practices from software engineering to the tasks now on my desk - learning new workflows, even though still code-driven, means reflecting on practices and figuring out how they fit into new workflows. One of the practices that I enjoy is Test-driven Development (TDD). I’ve recently cracked the problem, however, and in hindsight it seems obvious, yet it wasn't easy to find inspiration and resources online.
I am in no way dogmatic about TDD - and I don’t use it as much as I like, but it is a great tool that not only guards against regression but also drives great software design.
A simple TDD example
In prior jobs, TDD seemed more obvious; when building a component where the state is manageable (or part of the input if writing pure functions) it is more trivial to write tests for. I.e let’s say you want to write a function that removes duplicates:
def test_remove_duplicates():
assert remove_duplicates([1, 2, 3, 1]) == [1, 2, 3]
From that, I could generalize the test to make it parameterized and cover more test cases. Awesome.
It’s easy as we generally control the whole state. In some cases, I’d write a fake, or similar, if there are external parts we don’t care for. I.e we want to test an operation:
def test__add_new_payment(some_payment: Payment):
repository = InmemoryPaymentRepository()
controller = PaymentController(repository)
cottroller.Add(some_payment)
assert some_payment in repository.all()
The implementation of an InmemoryPaymentRepository
allows us to test the hypothetical PaymentController
, to ensure that it does its required state transitions. That’s generally speaking been my practice previously.
The messy parts
Translating this into an API or SQL Client seems messy. How do I test that? In previous employments, my repositories have been sufficiently simple, that the need to write complex tests to validate that SELECT * FROM payments
worked as expected, seemed excessive. Overall I’ve tried to create as slim clients, as possible to limit what was un-testable. However, when developing a somewhat complex data platform, the complexity of those integrations grows, and is less trivial to test, as there is less control of the underlying database.
I could, in theory, write an integration test that tests with the live source system, however, that doesn’t work in a CI pipeline, and tests outside of CI pipelines are rarely run (in my experience).
So what do I do?
The solution
Which might seem trivial, and/or might be flawed (I am hoping to receive feedback in the comments!), came about when I noticed a bug in one of our API clients. We received data outside of the date range we expected! Let’s say the API was Yr’s weather API.
Now, I wanted to have SOME sort of test, because we’d recently been refactoring this part of the code, and I suspected it might be a regression.
Having been reading Kent Beck’s great blog (as linked to a few times already), I was inspired to crack “How to test this”.
So I had two things I needed to verify:
Do we make the correct request to the API?
Do we convert the returned JSON correctly into a storage-able format?
To write tests, I needed a way to control the input and output from the API. I could use ugly monkey patching, but I’d prefer not to. Writing the ideal test instead, and then moving back from that, I realized I needed to abstract out the actual HTTP interactions, which made it easier to use a Mock:
def test_makes_correct_api_request():
http_client = MockedHttpClient()
subject = YrClient(http_client)
position = Position(lat=59.93,lon=10.72)
expected_url = f"https://api.met.no/weatherapi/locationforecast/2.0/complete?lat=59.93&lon=10.72"
_ = subject.get_weather_at(position)
http_client.get.assert_called_with(url=expected_url)
The above case was rather simple, however, it did test that we created the correct URL. This edges on testing implementation details, however ensuring that the URL is correctly formatted is a crucial aspect of ensuring that the client hasn’t regressed.
Similarly, I wanted to ensure that the method deserializes the output of the API correctly, and can handle it. By simply calling the above API manually I can get valid test data. An API like the above, will return a large set of entries, so to simplify the test I’d remove most of the entries in the time series, and remove properties in the dictionary that our client doesn’t read, yet still have more than one, to ensure that values are correctly mapped.
def test_deserializes_api_response(some_position: Position):
http_client = MockedHttpClient()
http_client.get.result = {
"properties": {
"timeseries": [
{...},
{...},
],
},
}
subject = YrClient(http_client)
result = subject.get_weather_at(some_position)
assert result == [
WeatherTimeseriesEntry(...),
WeatherTimeseriesEntry(...),
]
This test will naturally be obsolete if the API changes - and one could ask what value it then provides. Nevertheless, the test is both documentation of how the API is supposed to work, for future developers to understand, and also act as a contract of how we expect the API to work, both making changes more obvious and acting as a source of communication if the API is internally developed. And most importantly; if I were to refactor the YrClient, I can rest assured, that the relation.
This also created a tidy-ing where I moved all the HTTP and JSON parts out of the YrClient
and into its own abstraction. A tiny class indeed, which is why I never thought to create it, however, it minimizes the responsibilities of the YrClient
, and made it possible to streamline the exception handling regarding HTTP Status Codes throughout our data platform. Oh, and it ensures the client doesn’t break in subsequent changes.
Similar tests can be made regarding SQL sources, and if utilizing data frames should make things possible. Whether it makes sense, naturally depends on the complexity of the client, and which parts could introduce bugs.
If one generates SQL queries based on some properties, it might make sense to test this generation independently too; i.e:
def test_sql_client_creates_correct_query():
sql_client = MockSqlClient()
finance_client = FinanceClient(sql_client)
finance_client.get_payments_in_range(datetime(2020,1,1), datetime(2021,1,1))
sql_client.query.assert_called_with(query="SELECT * FROM payments WHERE payment_date < :start AND payment_date <= :end, start="2020-01-01", end="2021-01-01)
Further improvement of testability
This does, however, highlight a potential design smell; because what are we testing above? it isn’t the FinanceClient
as a whole, nor the get_payments_in_range
function, but merely a potential get_sql_query_for_payments_partition
- and we should probably find a way to make that atomic. A test like the following would probably highlight the abstraction, which we could move the functionality into:
def test_get_sql_query_for_payments_partition():
sql_client = MockSqlClient()
finance_client = FinanceClient(sql_client)
assert get_sql_query_for_payments_partition(datetime(2020,1,1), datetime(2021,1,1)) == Query(query="SELECT * FROM payments WHERE payment_date < :start AND payment_date <= :end, start="2020-01-01", end="2021-01-01)
As I wanted to preserve the parameters, rather than using string interpolation, I had to create a Query object, however, that might also make it easier to pass around eventually; it at least made the step testable, which is nice.
Conclusion
There are probably further refinements and reflections to be had, however, the minor realization did remove some fear of regression, and improve confidence in the system as a whole.