Skip to content

Commit

Permalink
Skip geocoding for records with empty addresses
Browse files Browse the repository at this point in the history
Geocoding these addresses with Smarty now triggers a hard failure.

(cherry picked from commit 2e9ae09)
  • Loading branch information
emk committed May 17, 2023
1 parent dac8def commit 1438afc
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 9 deletions.
6 changes: 6 additions & 0 deletions src/addresses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ pub struct Address {
}

impl Address {
/// A valid `Address` has a non-empty `street` field. (And whitespace
/// doesn't count as non-empty.)
pub fn is_valid(&self) -> bool {
!self.street.trim().is_empty()
}

/// The `city` field, or an empty string.
pub fn city_str(&self) -> &str {
self.city.as_ref().map(|s| &s[..]).unwrap_or("")
Expand Down
2 changes: 1 addition & 1 deletion src/geocoders/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ impl Geocoder for Cache {
// Encode our value for caching.
let value = retry.as_ref().map(|retry| &retry.column_values);
encoded.clear();
bincode::encode_into_std_write(&value, &mut encoded, bincode_config)
bincode::encode_into_std_write(value, &mut encoded, bincode_config)
.context("could not encode value for caching")?;

// Compress our encoded value and add it to our pipeline set.
Expand Down
72 changes: 72 additions & 0 deletions src/geocoders/invalid_record_skipper.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
//! Skip invalid addresses and don't pass them through to the next layer.

use async_trait::async_trait;
use metrics::{counter, describe_counter};

use crate::addresses::Address;

use super::{Geocoded, Geocoder, Result};

/// Skip invalid addresses and don't pass them through to the next layer.
pub struct InvalidRecordSkipper {
// Our inner geocoder that we're normalizing for.
inner: Box<dyn Geocoder>,
}

impl InvalidRecordSkipper {
/// Create a new `Normalizer` wrapping the specified geocoder.
pub fn new(inner: Box<dyn Geocoder>) -> InvalidRecordSkipper {
describe_counter!(
"geocodecsv.invalid_records.total",
"Invalid records which could not be geocoded"
);

InvalidRecordSkipper { inner }
}
}

#[async_trait]
impl Geocoder for InvalidRecordSkipper {
fn tag(&self) -> &str {
// We don't change anything which could possibly affect caching,
// so we can just use our inner tag.
self.inner.tag()
}

fn configuration_key(&self) -> &str {
self.inner.configuration_key()
}

fn column_names(&self) -> &[String] {
self.inner.column_names()
}

async fn geocode_addresses(
&self,
addresses: &[Address],
) -> Result<Vec<Option<Geocoded>>> {
// Extract our valid addresses, keeping track of their original
// positions.
let mut original_indices = vec![];
let mut valid_addresses = vec![];
for (i, address) in addresses.iter().enumerate() {
if address.is_valid() {
valid_addresses.push(address.clone());
original_indices.push(i);
} else {
counter!("geocodecsv.invalid_records.total", 1);
}
}

// Geocode our valid addresses.
let geocodeded = self.inner.geocode_addresses(&valid_addresses).await?;

// Rebuild our geocoded addresses, inserting `None` for invalid.
let mut result = vec![None; addresses.len()];
for (i, geocoded) in geocodeded.into_iter().enumerate() {
let original_index = original_indices[i];
result[original_index] = geocoded;
}
Ok(result)
}
}
6 changes: 5 additions & 1 deletion src/geocoders/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::{
};

pub mod cache;
pub mod invalid_record_skipper;
pub mod libpostal;
pub mod normalizer;
pub mod smarty;
Expand Down Expand Up @@ -55,6 +56,9 @@ pub enum MatchStrategy {
Enhanced,
}

// `#derive(Default)` would actually do the right thing, but we want to make the
// default explicit for the reader.
#[allow(clippy::derivable_impls)]
impl Default for MatchStrategy {
fn default() -> Self {
MatchStrategy::Strict
Expand Down Expand Up @@ -122,7 +126,7 @@ pub trait Geocoder: Send + Sync + 'static {
let mut hasher = Sha256::new();
for column_name in self.column_names() {
hasher.update(column_name.as_bytes());
hasher.update(&[0]);
hasher.update([0]);
}
hasher.update(self.configuration_key());
let hash = hasher.finalize();
Expand Down
16 changes: 9 additions & 7 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,6 @@

pub use anyhow::Result;
use anyhow::{format_err, Error};
use geocoders::cache::Cache;
use geocoders::libpostal::LibPostal;
use geocoders::normalizer::Normalizer;
use geocoders::smarty::Smarty;
use geocoders::{shared_http_client, Geocoder};
use key_value_stores::KeyValueStore;
use leaky_bucket::RateLimiter;
use metrics::describe_counter;
use opinionated_metrics::Mode;
Expand All @@ -37,7 +31,12 @@ mod pipeline;
mod unpack_vec;

use crate::addresses::AddressColumnSpec;
use crate::geocoders::MatchStrategy;
use crate::geocoders::{
cache::Cache, invalid_record_skipper::InvalidRecordSkipper, libpostal::LibPostal,
normalizer::Normalizer, shared_http_client, smarty::Smarty, Geocoder,
MatchStrategy,
};
use crate::key_value_stores::KeyValueStore;
use crate::pipeline::{geocode_stdio, OnDuplicateColumns, CONCURRENCY, GEOCODE_SIZE};

/// Underlying geocoders we can use. (Helper struct for argument parsing.)
Expand Down Expand Up @@ -237,6 +236,9 @@ async fn main() -> Result<()> {
geocoder = Box::new(Normalizer::new(geocoder));
}

// Always skip invalid records.
geocoder = Box::new(InvalidRecordSkipper::new(geocoder));

// Call our geocoder.
let result = geocode_stdio(
spec,
Expand Down
38 changes: 38 additions & 0 deletions tests/specs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,3 +217,41 @@ fn rate_limiter() {
.expect_success();
assert!(output.stdout_str().contains("shipping_addressee"));
}

#[test]
#[ignore]
fn skip_records_with_empty_house_number_and_street() {
let testdir = TestDir::new(
"geocode-csv",
"skip_records_with_empty_house_number_and_street",
);

testdir.create_file(
"spec.json",
r#"{
"shipping": {
"house_number_and_street": [
"address_1",
"address_2"
],
"postcode": "zip_code"
}
}"#,
);

let output = testdir
.cmd()
.arg("--license=us-core-enterprise-cloud")
.arg("--spec=spec.json")
.output_with_stdin(
r#"address_1,address_2,city,state,zip_code
,,New York,NY,10118
, ,Provo,UT,
"#,
)
.expect_success();
// We output all lines, without geocoding any that lack a street address.
assert!(output.stdout_str().contains("shipping_addressee"));
assert!(output.stdout_str().contains("New York"));
assert!(output.stdout_str().contains("Provo"));
}

0 comments on commit 1438afc

Please sign in to comment.